Skip to content

Commit a2d0652

Browse files
simple query ready for query (#22)
* add ready_status on simple queries * add correct socket2 features
1 parent b25e7f3 commit a2d0652

File tree

9 files changed

+74
-24
lines changed

9 files changed

+74
-24
lines changed

postgres/src/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,9 @@ impl Client {
439439
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
440440
/// them to this method!
441441
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
442-
self.connection.block_on(self.client.batch_execute(query))
442+
self.connection
443+
.block_on(self.client.batch_execute(query))
444+
.map(|_| ())
443445
}
444446

445447
/// Begins a new database transaction.

postgres/src/transaction.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl<'a> Transaction<'a> {
3535
pub fn commit(mut self) -> Result<(), Error> {
3636
self.connection
3737
.block_on(self.transaction.take().unwrap().commit())
38+
.map(|_| ())
3839
}
3940

4041
/// Rolls the transaction back, discarding all changes made within it.
@@ -43,6 +44,7 @@ impl<'a> Transaction<'a> {
4344
pub fn rollback(mut self) -> Result<(), Error> {
4445
self.connection
4546
.block_on(self.transaction.take().unwrap().rollback())
47+
.map(|_| ())
4648
}
4749

4850
/// Like `Client::prepare`.
@@ -193,6 +195,7 @@ impl<'a> Transaction<'a> {
193195
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
194196
self.connection
195197
.block_on(self.transaction.as_ref().unwrap().batch_execute(query))
198+
.map(|_| ())
196199
}
197200

198201
/// Like `Client::cancel_token`.

tokio-postgres/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pin-project-lite = "0.2"
5555
phf = "0.11"
5656
postgres-protocol = { version = "0.6.4", path = "../postgres-protocol" }
5757
postgres-types = { version = "0.2.4", path = "../postgres-types" }
58-
socket2 = "0.4"
58+
socket2 = { version = "0.5", features = ["all"] }
5959
tokio = { version = "1.0", features = ["io-util"] }
6060
tokio-util = { version = "0.7", features = ["codec"] }
6161

tokio-postgres/src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use crate::types::{Oid, ToSql, Type};
1717
use crate::Socket;
1818
use crate::{
1919
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
20-
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
21-
TransactionBuilder,
20+
CopyInSink, Error, ReadyForQueryStatus, Row, SimpleQueryMessage, Statement, ToStatement,
21+
Transaction, TransactionBuilder,
2222
};
2323
use bytes::{Buf, BytesMut};
2424
use fallible_iterator::FallibleIterator;
@@ -526,7 +526,7 @@ impl Client {
526526
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
527527
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
528528
/// them to this method!
529-
pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
529+
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
530530
simple_query::batch_execute(self.inner(), query).await
531531
}
532532

tokio-postgres/src/lib.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@
119119
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.7")]
120120
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
121121

122+
use postgres_protocol::message::backend::ReadyForQueryBody;
123+
122124
pub use crate::cancel_token::CancelToken;
123125
pub use crate::client::Client;
124126
pub use crate::config::Config;
@@ -143,6 +145,31 @@ pub use crate::transaction::Transaction;
143145
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
144146
use crate::types::ToSql;
145147

148+
/// After executing a query, the connection will be in one of these states
149+
#[derive(Clone, Copy, Debug, PartialEq)]
150+
#[repr(u8)]
151+
pub enum ReadyForQueryStatus {
152+
/// Connection state is unknown
153+
Unknown,
154+
/// Connection is idle (no transactions)
155+
Idle = b'I',
156+
/// Connection is in a transaction block
157+
Transaction = b'T',
158+
/// Connection is in a failed transaction block
159+
FailedTransaction = b'E',
160+
}
161+
162+
impl From<ReadyForQueryBody> for ReadyForQueryStatus {
163+
fn from(value: ReadyForQueryBody) -> Self {
164+
match value.status() {
165+
b'I' => Self::Idle,
166+
b'T' => Self::Transaction,
167+
b'E' => Self::FailedTransaction,
168+
_ => Self::Unknown,
169+
}
170+
}
171+
}
172+
146173
pub mod binary_copy;
147174
mod bind;
148175
#[cfg(feature = "runtime")]

tokio-postgres/src/query.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::types::{BorrowToSql, IsNull};
5-
use crate::{Error, Portal, Row, Statement};
5+
use crate::{Error, Portal, ReadyForQueryStatus, Row, Statement};
66
use bytes::{BufMut, Bytes, BytesMut};
77
use futures_util::{ready, Stream};
88
use log::{debug, log_enabled, Level};
@@ -55,7 +55,7 @@ where
5555
statement,
5656
responses,
5757
command_tag: None,
58-
status: None,
58+
status: ReadyForQueryStatus::Unknown,
5959
output_format: Format::Binary,
6060
_p: PhantomPinned,
6161
})
@@ -109,7 +109,7 @@ where
109109
statement,
110110
responses,
111111
command_tag: None,
112-
status: None,
112+
status: ReadyForQueryStatus::Unknown,
113113
output_format: Format::Text,
114114
_p: PhantomPinned,
115115
})
@@ -132,7 +132,7 @@ pub async fn query_portal(
132132
statement: portal.statement().clone(),
133133
responses,
134134
command_tag: None,
135-
status: None,
135+
status: ReadyForQueryStatus::Unknown,
136136
output_format: Format::Binary,
137137
_p: PhantomPinned,
138138
})
@@ -266,7 +266,7 @@ pin_project! {
266266
responses: Responses,
267267
command_tag: Option<String>,
268268
output_format: Format,
269-
status: Option<u8>,
269+
status: ReadyForQueryStatus,
270270
#[pin]
271271
_p: PhantomPinned,
272272
}
@@ -293,7 +293,7 @@ impl Stream for RowStream {
293293
}
294294
}
295295
Message::ReadyForQuery(status) => {
296-
*this.status = Some(status.status());
296+
*this.status = status.into();
297297
return Poll::Ready(None);
298298
}
299299
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
@@ -313,7 +313,7 @@ impl RowStream {
313313
/// Returns if the connection is ready for querying, with the status of the connection.
314314
///
315315
/// This might be available only after the stream has been exhausted.
316-
pub fn ready_status(&self) -> Option<u8> {
316+
pub fn ready_status(&self) -> ReadyForQueryStatus {
317317
self.status
318318
}
319319
}

tokio-postgres/src/simple_query.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
4+
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
55
use bytes::Bytes;
66
use fallible_iterator::FallibleIterator;
77
use futures_util::{ready, Stream};
@@ -40,19 +40,23 @@ pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQue
4040
Ok(SimpleQueryStream {
4141
responses,
4242
columns: None,
43+
status: ReadyForQueryStatus::Unknown,
4344
_p: PhantomPinned,
4445
})
4546
}
4647

47-
pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
48+
pub async fn batch_execute(
49+
client: &InnerClient,
50+
query: &str,
51+
) -> Result<ReadyForQueryStatus, Error> {
4852
debug!("executing statement batch: {}", query);
4953

5054
let buf = encode(client, query)?;
5155
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
5256

5357
loop {
5458
match responses.next().await? {
55-
Message::ReadyForQuery(_) => return Ok(()),
59+
Message::ReadyForQuery(status) => return Ok(status.into()),
5660
Message::CommandComplete(_)
5761
| Message::EmptyQueryResponse
5862
| Message::RowDescription(_)
@@ -74,11 +78,21 @@ pin_project! {
7478
pub struct SimpleQueryStream {
7579
responses: Responses,
7680
columns: Option<Arc<[SimpleColumn]>>,
81+
status: ReadyForQueryStatus,
7782
#[pin]
7883
_p: PhantomPinned,
7984
}
8085
}
8186

87+
impl SimpleQueryStream {
88+
/// Returns if the connection is ready for querying, with the status of the connection.
89+
///
90+
/// This might be available only after the stream has been exhausted.
91+
pub fn ready_status(&self) -> ReadyForQueryStatus {
92+
self.status
93+
}
94+
}
95+
8296
impl Stream for SimpleQueryStream {
8397
type Item = Result<SimpleQueryMessage, Error>;
8498

@@ -117,7 +131,10 @@ impl Stream for SimpleQueryStream {
117131
};
118132
return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
119133
}
120-
Message::ReadyForQuery(_) => return Poll::Ready(None),
134+
Message::ReadyForQuery(s) => {
135+
*this.status = s.into();
136+
return Poll::Ready(None);
137+
}
121138
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
122139
}
123140
}

tokio-postgres/src/transaction.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use crate::types::{BorrowToSql, ToSql, Type};
99
#[cfg(feature = "runtime")]
1010
use crate::Socket;
1111
use crate::{
12-
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row,
13-
SimpleQueryMessage, Statement, ToStatement,
12+
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, ReadyForQueryStatus,
13+
Row, SimpleQueryMessage, Statement, ToStatement,
1414
};
1515
use bytes::Buf;
1616
use futures_util::TryStreamExt;
@@ -65,7 +65,7 @@ impl<'a> Transaction<'a> {
6565
}
6666

6767
/// Consumes the transaction, committing all changes made within it.
68-
pub async fn commit(mut self) -> Result<(), Error> {
68+
pub async fn commit(mut self) -> Result<ReadyForQueryStatus, Error> {
6969
self.done = true;
7070
let query = if let Some(sp) = self.savepoint.as_ref() {
7171
format!("RELEASE {}", sp.name)
@@ -78,7 +78,7 @@ impl<'a> Transaction<'a> {
7878
/// Rolls the transaction back, discarding all changes made within it.
7979
///
8080
/// This is equivalent to `Transaction`'s `Drop` implementation, but provides any error encountered to the caller.
81-
pub async fn rollback(mut self) -> Result<(), Error> {
81+
pub async fn rollback(mut self) -> Result<ReadyForQueryStatus, Error> {
8282
self.done = true;
8383
let query = if let Some(sp) = self.savepoint.as_ref() {
8484
format!("ROLLBACK TO {}", sp.name)
@@ -261,7 +261,7 @@ impl<'a> Transaction<'a> {
261261
}
262262

263263
/// Like `Client::batch_execute`.
264-
pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
264+
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
265265
self.client.batch_execute(query).await
266266
}
267267

tokio-postgres/tests/test/main.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use tokio_postgres::error::SqlState;
1616
use tokio_postgres::tls::{NoTls, NoTlsStream};
1717
use tokio_postgres::types::{Kind, Type};
1818
use tokio_postgres::{
19-
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, SimpleQueryMessage,
19+
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, ReadyForQueryStatus,
20+
SimpleQueryMessage,
2021
};
2122

2223
mod binary_copy;
@@ -365,7 +366,7 @@ async fn ready_for_query() {
365366
pin_mut!(row_stream);
366367
while row_stream.next().await.is_none() {}
367368

368-
assert_eq!(row_stream.ready_status(), Some(b'T'));
369+
assert_eq!(row_stream.ready_status(), ReadyForQueryStatus::Transaction);
369370

370371
let row_stream = client
371372
.query_raw_txt("ROLLBACK", [] as [Option<&str>; 0])
@@ -375,7 +376,7 @@ async fn ready_for_query() {
375376
pin_mut!(row_stream);
376377
while row_stream.next().await.is_none() {}
377378

378-
assert_eq!(row_stream.ready_status(), Some(b'I'));
379+
assert_eq!(row_stream.ready_status(), ReadyForQueryStatus::Idle);
379380
}
380381

381382
#[tokio::test]

0 commit comments

Comments
 (0)