Skip to content

Commit f0450f4

Browse files
jtgeibelTurbo87
authored andcommitted
Ensure transactions are rolled back if background worker panics
1 parent 9fdb5ae commit f0450f4

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

src/swirl/runner.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use diesel::connection::{AnsiTransactionManager, TransactionManager};
12
use diesel::prelude::*;
23
use diesel::r2d2;
34
use diesel::r2d2::ConnectionManager;
@@ -156,6 +157,16 @@ impl Runner {
156157
};
157158
let job_id = job.id;
158159

160+
let transaction_manager = conn.transaction_manager();
161+
let initial_depth = <AnsiTransactionManager as TransactionManager<
162+
PgConnection,
163+
>>::get_transaction_depth(
164+
transaction_manager
165+
);
166+
if initial_depth != 1 {
167+
warn!("Initial transaction depth is not 1. This is very unexpected");
168+
}
169+
159170
let result = conn
160171
.transaction(|| {
161172
let conn = AssertUnwindSafe(conn);
@@ -169,6 +180,21 @@ impl Runner {
169180
// TODO: Replace with flatten() once that stabilizes
170181
.and_then(std::convert::identity);
171182

183+
loop {
184+
let depth = <AnsiTransactionManager as TransactionManager<
185+
PgConnection,
186+
>>::get_transaction_depth(
187+
transaction_manager
188+
);
189+
if depth == initial_depth {
190+
break;
191+
}
192+
warn!("Rolling back a transaction due to a panic in a background task");
193+
let _ = transaction_manager
194+
.rollback_transaction(conn)
195+
.map_err(|e| error!("Error while rolling back transaction: {e}"));
196+
}
197+
172198
match result {
173199
Ok(_) => storage::delete_successful_job(conn, job_id)?,
174200
Err(e) => {
@@ -310,10 +336,12 @@ mod tests {
310336
let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
311337
let barrier2 = barrier.clone();
312338

313-
runner.get_single_job(dummy_sender(), move |_, _| {
314-
barrier.0.wait();
315-
// The job should go back into the queue after a panic
316-
panic!();
339+
runner.get_single_job(dummy_sender(), move |_, conn| {
340+
conn.transaction(|| {
341+
barrier.0.wait();
342+
// The job should go back into the queue after a panic
343+
panic!();
344+
})
317345
});
318346

319347
let conn = &*runner.connection().unwrap();

0 commit comments

Comments
 (0)