1
+ use diesel:: connection:: { AnsiTransactionManager , TransactionManager } ;
1
2
use diesel:: prelude:: * ;
2
3
use diesel:: r2d2;
3
4
use diesel:: r2d2:: ConnectionManager ;
@@ -156,6 +157,16 @@ impl Runner {
156
157
} ;
157
158
let job_id = job. id ;
158
159
160
+ let transaction_manager = conn. transaction_manager ( ) ;
161
+ let initial_depth = <AnsiTransactionManager as TransactionManager <
162
+ PgConnection ,
163
+ > >:: get_transaction_depth (
164
+ transaction_manager
165
+ ) ;
166
+ if initial_depth != 1 {
167
+ warn ! ( "Initial transaction depth is not 1. This is very unexpected" ) ;
168
+ }
169
+
159
170
let result = conn
160
171
. transaction ( || {
161
172
let conn = AssertUnwindSafe ( conn) ;
@@ -169,6 +180,21 @@ impl Runner {
169
180
// TODO: Replace with flatten() once that stabilizes
170
181
. and_then ( std:: convert:: identity) ;
171
182
183
+ loop {
184
+ let depth = <AnsiTransactionManager as TransactionManager <
185
+ PgConnection ,
186
+ > >:: get_transaction_depth (
187
+ transaction_manager
188
+ ) ;
189
+ if depth == initial_depth {
190
+ break ;
191
+ }
192
+ warn ! ( "Rolling back a transaction due to a panic in a background task" ) ;
193
+ let _ = transaction_manager
194
+ . rollback_transaction ( conn)
195
+ . map_err ( |e| error ! ( "Error while rolling back transaction: {e}" ) ) ;
196
+ }
197
+
172
198
match result {
173
199
Ok ( _) => storage:: delete_successful_job ( conn, job_id) ?,
174
200
Err ( e) => {
@@ -310,10 +336,12 @@ mod tests {
310
336
let barrier = Arc :: new ( AssertUnwindSafe ( Barrier :: new ( 2 ) ) ) ;
311
337
let barrier2 = barrier. clone ( ) ;
312
338
313
- runner. get_single_job ( dummy_sender ( ) , move |_, _| {
314
- barrier. 0 . wait ( ) ;
315
- // The job should go back into the queue after a panic
316
- panic ! ( ) ;
339
+ runner. get_single_job ( dummy_sender ( ) , move |_, conn| {
340
+ conn. transaction ( || {
341
+ barrier. 0 . wait ( ) ;
342
+ // The job should go back into the queue after a panic
343
+ panic ! ( ) ;
344
+ } )
317
345
} ) ;
318
346
319
347
let conn = & * runner. connection ( ) . unwrap ( ) ;
0 commit comments