Skip to content

Commit 6006b9c

Browse files
sfacklerconradludgate
authored andcommitted
Fix cancellation of TransactionBuilder::start
1 parent e94aaff commit 6006b9c

File tree

2 files changed

+41
-41
lines changed

2 files changed

+41
-41
lines changed

tokio-postgres/src/client.rs

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::codec::{BackendMessages, FrontendMessage};
1+
use crate::codec::BackendMessages;
22
use crate::config::SslMode;
33
use crate::connection::{Request, RequestMessages};
44
use crate::copy_both::CopyBothDuplex;
@@ -23,7 +23,7 @@ use fallible_iterator::FallibleIterator;
2323
use futures_channel::mpsc;
2424
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
2525
use parking_lot::Mutex;
26-
use postgres_protocol::message::{backend::Message, frontend};
26+
use postgres_protocol::message::backend::Message;
2727
use postgres_types::BorrowToSql;
2828
use std::collections::HashMap;
2929
use std::fmt;
@@ -493,43 +493,7 @@ impl Client {
493493
///
494494
/// The transaction will roll back by default - use the `commit` method to commit it.
495495
pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
496-
struct RollbackIfNotDone<'me> {
497-
client: &'me Client,
498-
done: bool,
499-
}
500-
501-
impl<'a> Drop for RollbackIfNotDone<'a> {
502-
fn drop(&mut self) {
503-
if self.done {
504-
return;
505-
}
506-
507-
let buf = self.client.inner().with_buf(|buf| {
508-
frontend::query("ROLLBACK", buf).unwrap();
509-
buf.split().freeze()
510-
});
511-
let _ = self
512-
.client
513-
.inner()
514-
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
515-
}
516-
}
517-
518-
// This is done, as `Future` created by this method can be dropped after
519-
// `RequestMessages` is synchronously send to the `Connection` by
520-
// `batch_execute()`, but before `Responses` is asynchronously polled to
521-
// completion. In that case `Transaction` won't be created and thus
522-
// won't be rolled back.
523-
{
524-
let mut cleaner = RollbackIfNotDone {
525-
client: self,
526-
done: false,
527-
};
528-
self.batch_execute("BEGIN").await?;
529-
cleaner.done = true;
530-
}
531-
532-
Ok(Transaction::new(self))
496+
self.build_transaction().start().await
533497
}
534498

535499
/// Returns a builder for a transaction with custom settings.

tokio-postgres/src/transaction_builder.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{Client, Error, Transaction};
1+
use postgres_protocol::message::frontend;
2+
3+
use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction};
24

35
/// The isolation level of a database transaction.
46
#[derive(Debug, Copy, Clone)]
@@ -106,7 +108,41 @@ impl<'a> TransactionBuilder<'a> {
106108
query.push_str(s);
107109
}
108110

109-
self.client.batch_execute(&query).await?;
111+
struct RollbackIfNotDone<'me> {
112+
client: &'me Client,
113+
done: bool,
114+
}
115+
116+
impl<'a> Drop for RollbackIfNotDone<'a> {
117+
fn drop(&mut self) {
118+
if self.done {
119+
return;
120+
}
121+
122+
let buf = self.client.inner().with_buf(|buf| {
123+
frontend::query("ROLLBACK", buf).unwrap();
124+
buf.split().freeze()
125+
});
126+
let _ = self
127+
.client
128+
.inner()
129+
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
130+
}
131+
}
132+
133+
// This is done as `Future` created by this method can be dropped after
134+
// `RequestMessages` is synchronously send to the `Connection` by
135+
// `batch_execute()`, but before `Responses` is asynchronously polled to
136+
// completion. In that case `Transaction` won't be created and thus
137+
// won't be rolled back.
138+
{
139+
let mut cleaner = RollbackIfNotDone {
140+
client: self.client,
141+
done: false,
142+
};
143+
self.client.batch_execute(&query).await?;
144+
cleaner.done = true;
145+
}
110146

111147
Ok(Transaction::new(self.client))
112148
}

0 commit comments

Comments
 (0)