Skip to content

simple query ready for query #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,9 @@ impl Client {
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
self.connection.block_on(self.client.batch_execute(query))
self.connection
.block_on(self.client.batch_execute(query))
.map(|_| ())
}

/// Begins a new database transaction.
Expand Down
3 changes: 3 additions & 0 deletions postgres/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl<'a> Transaction<'a> {
pub fn commit(mut self) -> Result<(), Error> {
self.connection
.block_on(self.transaction.take().unwrap().commit())
.map(|_| ())
}

/// Rolls the transaction back, discarding all changes made within it.
Expand All @@ -43,6 +44,7 @@ impl<'a> Transaction<'a> {
pub fn rollback(mut self) -> Result<(), Error> {
self.connection
.block_on(self.transaction.take().unwrap().rollback())
.map(|_| ())
}

/// Like `Client::prepare`.
Expand Down Expand Up @@ -193,6 +195,7 @@ impl<'a> Transaction<'a> {
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
self.connection
.block_on(self.transaction.as_ref().unwrap().batch_execute(query))
.map(|_| ())
}

/// Like `Client::cancel_token`.
Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pin-project-lite = "0.2"
phf = "0.11"
postgres-protocol = { version = "0.6.4", path = "../postgres-protocol" }
postgres-types = { version = "0.2.4", path = "../postgres-types" }
socket2 = "0.4"
socket2 = { version = "0.5", features = ["all"] }
tokio = { version = "1.0", features = ["io-util"] }
tokio-util = { version = "0.7", features = ["codec"] }

Expand Down
6 changes: 3 additions & 3 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::types::{Oid, ToSql, Type};
use crate::Socket;
use crate::{
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
TransactionBuilder,
CopyInSink, Error, ReadyForQueryStatus, Row, SimpleQueryMessage, Statement, ToStatement,
Transaction, TransactionBuilder,
};
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
Expand Down Expand Up @@ -526,7 +526,7 @@ impl Client {
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
simple_query::batch_execute(self.inner(), query).await
}

Expand Down
27 changes: 27 additions & 0 deletions tokio-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.7")]
#![warn(rust_2018_idioms, clippy::all, missing_docs)]

use postgres_protocol::message::backend::ReadyForQueryBody;

pub use crate::cancel_token::CancelToken;
pub use crate::client::Client;
pub use crate::config::Config;
Expand All @@ -143,6 +145,31 @@ pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;

/// After executing a query, the connection will be in one of these states
#[derive(Clone, Copy, Debug, PartialEq)]
#[repr(u8)]
pub enum ReadyForQueryStatus {
/// Connection state is unknown
Unknown,
/// Connection is idle (no transactions)
Idle = b'I',
/// Connection is in a transaction block
Transaction = b'T',
/// Connection is in a failed transaction block
FailedTransaction = b'E',
}

impl From<ReadyForQueryBody> for ReadyForQueryStatus {
fn from(value: ReadyForQueryBody) -> Self {
match value.status() {
b'I' => Self::Idle,
b'T' => Self::Transaction,
b'E' => Self::FailedTransaction,
_ => Self::Unknown,
}
}
}

pub mod binary_copy;
mod bind;
#[cfg(feature = "runtime")]
Expand Down
14 changes: 7 additions & 7 deletions tokio-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::{BorrowToSql, IsNull};
use crate::{Error, Portal, Row, Statement};
use crate::{Error, Portal, ReadyForQueryStatus, Row, Statement};
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::{ready, Stream};
use log::{debug, log_enabled, Level};
Expand Down Expand Up @@ -55,7 +55,7 @@ where
statement,
responses,
command_tag: None,
status: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Binary,
_p: PhantomPinned,
})
Expand Down Expand Up @@ -109,7 +109,7 @@ where
statement,
responses,
command_tag: None,
status: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Text,
_p: PhantomPinned,
})
Expand All @@ -132,7 +132,7 @@ pub async fn query_portal(
statement: portal.statement().clone(),
responses,
command_tag: None,
status: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Binary,
_p: PhantomPinned,
})
Expand Down Expand Up @@ -266,7 +266,7 @@ pin_project! {
responses: Responses,
command_tag: Option<String>,
output_format: Format,
status: Option<u8>,
status: ReadyForQueryStatus,
#[pin]
_p: PhantomPinned,
}
Expand All @@ -293,7 +293,7 @@ impl Stream for RowStream {
}
}
Message::ReadyForQuery(status) => {
*this.status = Some(status.status());
*this.status = status.into();
return Poll::Ready(None);
}
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
Expand All @@ -313,7 +313,7 @@ impl RowStream {
/// Returns if the connection is ready for querying, with the status of the connection.
///
/// This might be available only after the stream has been exhausted.
pub fn ready_status(&self) -> Option<u8> {
pub fn ready_status(&self) -> ReadyForQueryStatus {
self.status
}
}
25 changes: 21 additions & 4 deletions tokio-postgres/src/simple_query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
Expand Down Expand Up @@ -40,19 +40,23 @@ pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQue
Ok(SimpleQueryStream {
responses,
columns: None,
status: ReadyForQueryStatus::Unknown,
_p: PhantomPinned,
})
}

pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
pub async fn batch_execute(
client: &InnerClient,
query: &str,
) -> Result<ReadyForQueryStatus, Error> {
debug!("executing statement batch: {}", query);

let buf = encode(client, query)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;

loop {
match responses.next().await? {
Message::ReadyForQuery(_) => return Ok(()),
Message::ReadyForQuery(status) => return Ok(status.into()),
Message::CommandComplete(_)
| Message::EmptyQueryResponse
| Message::RowDescription(_)
Expand All @@ -74,11 +78,21 @@ pin_project! {
pub struct SimpleQueryStream {
responses: Responses,
columns: Option<Arc<[SimpleColumn]>>,
status: ReadyForQueryStatus,
#[pin]
_p: PhantomPinned,
}
}

impl SimpleQueryStream {
/// Returns if the connection is ready for querying, with the status of the connection.
///
/// This might be available only after the stream has been exhausted.
pub fn ready_status(&self) -> ReadyForQueryStatus {
self.status
}
}

impl Stream for SimpleQueryStream {
type Item = Result<SimpleQueryMessage, Error>;

Expand Down Expand Up @@ -117,7 +131,10 @@ impl Stream for SimpleQueryStream {
};
return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
}
Message::ReadyForQuery(_) => return Poll::Ready(None),
Message::ReadyForQuery(s) => {
*this.status = s.into();
return Poll::Ready(None);
}
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
}
}
Expand Down
10 changes: 5 additions & 5 deletions tokio-postgres/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::types::{BorrowToSql, ToSql, Type};
#[cfg(feature = "runtime")]
use crate::Socket;
use crate::{
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row,
SimpleQueryMessage, Statement, ToStatement,
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, ReadyForQueryStatus,
Row, SimpleQueryMessage, Statement, ToStatement,
};
use bytes::Buf;
use futures_util::TryStreamExt;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl<'a> Transaction<'a> {
}

/// Consumes the transaction, committing all changes made within it.
pub async fn commit(mut self) -> Result<(), Error> {
pub async fn commit(mut self) -> Result<ReadyForQueryStatus, Error> {
self.done = true;
let query = if let Some(sp) = self.savepoint.as_ref() {
format!("RELEASE {}", sp.name)
Expand All @@ -78,7 +78,7 @@ impl<'a> Transaction<'a> {
/// Rolls the transaction back, discarding all changes made within it.
///
/// This is equivalent to `Transaction`'s `Drop` implementation, but provides any error encountered to the caller.
pub async fn rollback(mut self) -> Result<(), Error> {
pub async fn rollback(mut self) -> Result<ReadyForQueryStatus, Error> {
self.done = true;
let query = if let Some(sp) = self.savepoint.as_ref() {
format!("ROLLBACK TO {}", sp.name)
Expand Down Expand Up @@ -261,7 +261,7 @@ impl<'a> Transaction<'a> {
}

/// Like `Client::batch_execute`.
pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
self.client.batch_execute(query).await
}

Expand Down
7 changes: 4 additions & 3 deletions tokio-postgres/tests/test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use tokio_postgres::error::SqlState;
use tokio_postgres::tls::{NoTls, NoTlsStream};
use tokio_postgres::types::{Kind, Type};
use tokio_postgres::{
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, SimpleQueryMessage,
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, ReadyForQueryStatus,
SimpleQueryMessage,
};

mod binary_copy;
Expand Down Expand Up @@ -365,7 +366,7 @@ async fn ready_for_query() {
pin_mut!(row_stream);
while row_stream.next().await.is_none() {}

assert_eq!(row_stream.ready_status(), Some(b'T'));
assert_eq!(row_stream.ready_status(), ReadyForQueryStatus::Transaction);

let row_stream = client
.query_raw_txt("ROLLBACK", [] as [Option<&str>; 0])
Expand All @@ -375,7 +376,7 @@ async fn ready_for_query() {
pin_mut!(row_stream);
while row_stream.next().await.is_none() {}

assert_eq!(row_stream.ready_status(), Some(b'I'));
assert_eq!(row_stream.ready_status(), ReadyForQueryStatus::Idle);
}

#[tokio::test]
Expand Down