Skip to content

Commit 766b29d

Browse files
committed
Fix rebase failures
Now that we're wrapping the connection in our own smart pointer that doesn't implement `Connection` we need some explicit derefs (adding the manual `Connection` impl isn't worth avoiding these `*`s). We need to be able to clone the connection pool (only in tests, but this also only requires modifying the test variant), so we need the `Arc`. Similarly, since in tests the connection is a re-entrant mutex, we can't grab the connection before spawning the worker thread. The lock isn't `Send` that's for a very good reason. So we instead need to clone a handle to the pool and grab the connection on the thread we intend to use it.
1 parent 65539fd commit 766b29d

File tree

3 files changed

+17
-11
lines changed

3 files changed

+17
-11
lines changed

src/background/runner.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ impl<Env: RefUnwindSafe + Send + Sync + 'static> Runner<Env> {
7979
where
8080
F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static,
8181
{
82-
let conn = self.connection().expect("Could not acquire connection");
82+
// The connection may not be `Send` so we need to clone the pool instead
83+
let pool = self.connection_pool.clone();
8384
self.thread_pool.execute(move || {
85+
let conn = pool.get().expect("Could not acquire connection");
8486
conn.transaction::<_, Box<dyn CargoError>, _>(|| {
8587
let job = storage::find_next_unlocked_job(&conn).optional()?;
8688
let job = match job {
@@ -192,7 +194,7 @@ mod tests {
192194

193195
let remaining_jobs = background_jobs
194196
.count()
195-
.get_result(&runner.connection().unwrap());
197+
.get_result(&*runner.connection().unwrap());
196198
assert_eq!(Ok(0), remaining_jobs);
197199
}
198200

@@ -223,15 +225,15 @@ mod tests {
223225
.select(id)
224226
.filter(retries.eq(0))
225227
.for_update()
226-
.load::<i64>(&conn)
228+
.load::<i64>(&*conn)
227229
.unwrap();
228230
assert_eq!(0, available_jobs.len());
229231

230232
// Sanity check to make sure the job actually is there
231233
let total_jobs_including_failed = background_jobs
232234
.select(id)
233235
.for_update()
234-
.load::<i64>(&conn)
236+
.load::<i64>(&*conn)
235237
.unwrap();
236238
assert_eq!(1, total_jobs_including_failed.len());
237239

@@ -251,7 +253,7 @@ mod tests {
251253
.find(job_id)
252254
.select(retries)
253255
.for_update()
254-
.first::<i32>(&runner.connection().unwrap())
256+
.first::<i32>(&*runner.connection().unwrap())
255257
.unwrap();
256258
assert_eq!(1, tries);
257259
}
@@ -277,7 +279,7 @@ mod tests {
277279
impl<'a> Drop for TestGuard<'a> {
278280
fn drop(&mut self) {
279281
::diesel::sql_query("TRUNCATE TABLE background_jobs")
280-
.execute(&runner().connection().unwrap())
282+
.execute(&*runner().connection().unwrap())
281283
.unwrap();
282284
}
283285
}
@@ -290,14 +292,14 @@ mod tests {
290292
let manager = r2d2::ConnectionManager::new(database_url);
291293
let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap();
292294

293-
Runner::builder(pool, ()).thread_count(2).build()
295+
Runner::builder(DieselPool::Pool(pool), ()).thread_count(2).build()
294296
}
295297

296298
fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob {
297299
::diesel::insert_into(background_jobs)
298300
.values((job_type.eq("Foo"), data.eq(json!(null))))
299301
.returning((id, job_type, data))
300-
.get_result(&runner.connection().unwrap())
302+
.get_result(&*runner.connection().unwrap())
301303
.unwrap()
302304
}
303305
}

src/bin/background-worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn main() {
3131

3232
// We're only using 1 thread, so we only need 1 connection
3333
let db_config = r2d2::Pool::builder().max_size(1);
34-
let db_pool = db::diesel_pool(&config.db_url, db_config);
34+
let db_pool = db::diesel_pool(&config.db_url, config.env, db_config);
3535

3636
let builder = background::Runner::builder(db_pool, environment).thread_count(1);
3737
let runner = job_runner(builder);

src/db.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use conduit::Request;
44
use diesel::prelude::*;
55
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
66
use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
7+
use std::sync::Arc;
78
use std::ops::Deref;
89
use url::Url;
910

@@ -12,9 +13,10 @@ use crate::util::CargoResult;
1213
use crate::Env;
1314

1415
#[allow(missing_debug_implementations)]
16+
#[derive(Clone)]
1517
pub enum DieselPool {
1618
Pool(r2d2::Pool<ConnectionManager<PgConnection>>),
17-
Test(ReentrantMutex<PgConnection>),
19+
Test(Arc<ReentrantMutex<PgConnection>>),
1820
}
1921

2022
impl DieselPool {
@@ -33,7 +35,7 @@ impl DieselPool {
3335
}
3436

3537
fn test_conn(conn: PgConnection) -> Self {
36-
DieselPool::Test(ReentrantMutex::new(conn))
38+
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
3739
}
3840
}
3941

@@ -43,6 +45,8 @@ pub enum DieselPooledConn<'a> {
4345
Test(ReentrantMutexGuard<'a, PgConnection>),
4446
}
4547

48+
unsafe impl<'a> Send for DieselPooledConn<'a> {}
49+
4650
impl Deref for DieselPooledConn<'_> {
4751
type Target = PgConnection;
4852

0 commit comments

Comments
 (0)