Skip to content

Commit b25e7f3

Browse files
Connection changes (#21)
* refactor query_raw_txt to use a pre-prepared statement * expose ready_status on RowStream
1 parent 9011f71 commit b25e7f3

File tree

8 files changed

+104
-105
lines changed

8 files changed

+104
-105
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ jobs:
5757
- run: docker compose up -d
5858
- uses: sfackler/actions/rustup@master
5959
with:
60-
version: 1.65.0
60+
version: 1.67.0
6161
- run: echo "::set-output name=version::$(rustc --version)"
6262
id: rust-version
6363
- uses: actions/cache@v1

tokio-postgres/src/client.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,13 +376,19 @@ impl Client {
376376

377377
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
378378
/// to save a roundtrip
379-
pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
379+
pub async fn query_raw_txt<'a, T, S, I>(
380+
&self,
381+
statement: &T,
382+
params: I,
383+
) -> Result<RowStream, Error>
380384
where
381-
S: AsRef<str> + Sync + Send,
385+
T: ?Sized + ToStatement,
386+
S: AsRef<str>,
382387
I: IntoIterator<Item = Option<S>>,
383-
I::IntoIter: ExactSizeIterator + Sync + Send,
388+
I::IntoIter: ExactSizeIterator,
384389
{
385-
query::query_txt(&self.inner, query, params).await
390+
let statement = statement.__convert().into_statement(self).await?;
391+
query::query_txt(&self.inner, statement, params).await
386392
}
387393

388394
/// Executes a statement, returning the number of rows modified.

tokio-postgres/src/generic_client.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,13 @@ pub trait GenericClient: private::Sealed {
5757
I::IntoIter: ExactSizeIterator;
5858

5959
/// Like `Client::query_raw_txt`.
60-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
60+
async fn query_raw_txt<'a, T, S, I>(
61+
&self,
62+
statement: &T,
63+
params: I,
64+
) -> Result<RowStream, Error>
6165
where
66+
T: ?Sized + ToStatement + Sync + Send,
6267
S: AsRef<str> + Sync + Send,
6368
I: IntoIterator<Item = Option<S>> + Sync + Send,
6469
I::IntoIter: ExactSizeIterator + Sync + Send;
@@ -140,13 +145,14 @@ impl GenericClient for Client {
140145
self.query_raw(statement, params).await
141146
}
142147

143-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
148+
async fn query_raw_txt<'a, T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
144149
where
150+
T: ?Sized + ToStatement + Sync + Send,
145151
S: AsRef<str> + Sync + Send,
146152
I: IntoIterator<Item = Option<S>> + Sync + Send,
147153
I::IntoIter: ExactSizeIterator + Sync + Send,
148154
{
149-
self.query_raw_txt(query, params).await
155+
self.query_raw_txt(statement, params).await
150156
}
151157

152158
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
@@ -231,13 +237,14 @@ impl GenericClient for Transaction<'_> {
231237
self.query_raw(statement, params).await
232238
}
233239

234-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
240+
async fn query_raw_txt<'a, T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
235241
where
242+
T: ?Sized + ToStatement + Sync + Send,
236243
S: AsRef<str> + Sync + Send,
237244
I: IntoIterator<Item = Option<S>> + Sync + Send,
238245
I::IntoIter: ExactSizeIterator + Sync + Send,
239246
{
240-
self.query_raw_txt(query, params).await
247+
self.query_raw_txt(statement, params).await
241248
}
242249

243250
async fn prepare(&self, query: &str) -> Result<Statement, Error> {

tokio-postgres/src/query.rs

Lines changed: 36 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::prepare::get_type;
54
use crate::types::{BorrowToSql, IsNull};
6-
use crate::{Column, Error, Portal, Row, Statement};
5+
use crate::{Error, Portal, Row, Statement};
76
use bytes::{BufMut, Bytes, BytesMut};
8-
use fallible_iterator::FallibleIterator;
97
use futures_util::{ready, Stream};
108
use log::{debug, log_enabled, Level};
119
use pin_project_lite::pin_project;
1210
use postgres_protocol::message::backend::Message;
1311
use postgres_protocol::message::frontend;
14-
use postgres_types::Type;
12+
use postgres_types::Format;
1513
use std::fmt;
1614
use std::marker::PhantomPinned;
1715
use std::pin::Pin;
@@ -57,30 +55,29 @@ where
5755
statement,
5856
responses,
5957
command_tag: None,
58+
status: None,
59+
output_format: Format::Binary,
6060
_p: PhantomPinned,
6161
})
6262
}
6363

6464
pub async fn query_txt<S, I>(
6565
client: &Arc<InnerClient>,
66-
query: S,
66+
statement: Statement,
6767
params: I,
6868
) -> Result<RowStream, Error>
6969
where
70-
S: AsRef<str> + Sync + Send,
70+
S: AsRef<str>,
7171
I: IntoIterator<Item = Option<S>>,
7272
I::IntoIter: ExactSizeIterator,
7373
{
7474
let params = params.into_iter();
75-
let params_len = params.len();
7675

7776
let buf = client.with_buf(|buf| {
78-
// Parse, anonymous portal
79-
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
8077
// Bind, pass params as text, retrieve as binary
8178
match frontend::bind(
8279
"", // empty string selects the unnamed portal
83-
"", // empty string selects the unnamed prepared statement
80+
statement.name(), // named prepared statement
8481
std::iter::empty(), // all parameters use the default format (text)
8582
params,
8683
|param, buf| match param {
@@ -98,8 +95,6 @@ where
9895
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
9996
}?;
10097

101-
// Describe portal to typecast results
102-
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
10398
// Execute
10499
frontend::execute("", 0, buf).map_err(Error::encode)?;
105100
// Sync
@@ -108,43 +103,16 @@ where
108103
Ok(buf.split().freeze())
109104
})?;
110105

111-
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
112-
113106
// now read the responses
114-
115-
match responses.next().await? {
116-
Message::ParseComplete => {}
117-
_ => return Err(Error::unexpected_message()),
118-
}
119-
match responses.next().await? {
120-
Message::BindComplete => {}
121-
_ => return Err(Error::unexpected_message()),
122-
}
123-
let row_description = match responses.next().await? {
124-
Message::RowDescription(body) => Some(body),
125-
Message::NoData => None,
126-
_ => return Err(Error::unexpected_message()),
127-
};
128-
129-
// construct statement object
130-
131-
let parameters = vec![Type::UNKNOWN; params_len];
132-
133-
let mut columns = vec![];
134-
if let Some(row_description) = row_description {
135-
let mut it = row_description.fields();
136-
while let Some(field) = it.next().map_err(Error::parse)? {
137-
// NB: for some types that function may send a query to the server. At least in
138-
// raw text mode we don't need that info and can skip this.
139-
let type_ = get_type(client, field.type_oid()).await?;
140-
let column = Column::new(field.name().to_string(), type_, field);
141-
columns.push(column);
142-
}
143-
}
144-
145-
let statement = Statement::new_text(client, "".to_owned(), parameters, columns);
146-
147-
Ok(RowStream::new(statement, responses))
107+
let responses = start(client, buf).await?;
108+
Ok(RowStream {
109+
statement,
110+
responses,
111+
command_tag: None,
112+
status: None,
113+
output_format: Format::Text,
114+
_p: PhantomPinned,
115+
})
148116
}
149117

150118
pub async fn query_portal(
@@ -164,6 +132,8 @@ pub async fn query_portal(
164132
statement: portal.statement().clone(),
165133
responses,
166134
command_tag: None,
135+
status: None,
136+
output_format: Format::Binary,
167137
_p: PhantomPinned,
168138
})
169139
}
@@ -295,23 +265,13 @@ pin_project! {
295265
statement: Statement,
296266
responses: Responses,
297267
command_tag: Option<String>,
268+
output_format: Format,
269+
status: Option<u8>,
298270
#[pin]
299271
_p: PhantomPinned,
300272
}
301273
}
302274

303-
impl RowStream {
304-
/// Creates a new `RowStream`.
305-
pub fn new(statement: Statement, responses: Responses) -> Self {
306-
RowStream {
307-
statement,
308-
responses,
309-
command_tag: None,
310-
_p: PhantomPinned,
311-
}
312-
}
313-
}
314-
315275
impl Stream for RowStream {
316276
type Item = Result<Row, Error>;
317277

@@ -320,15 +280,22 @@ impl Stream for RowStream {
320280
loop {
321281
match ready!(this.responses.poll_next(cx)?) {
322282
Message::DataRow(body) => {
323-
return Poll::Ready(Some(Ok(Row::new(this.statement.clone(), body)?)))
283+
return Poll::Ready(Some(Ok(Row::new(
284+
this.statement.clone(),
285+
body,
286+
*this.output_format,
287+
)?)))
324288
}
325289
Message::EmptyQueryResponse | Message::PortalSuspended => {}
326290
Message::CommandComplete(body) => {
327291
if let Ok(tag) = body.tag() {
328292
*this.command_tag = Some(tag.to_string());
329293
}
330294
}
331-
Message::ReadyForQuery(_) => return Poll::Ready(None),
295+
Message::ReadyForQuery(status) => {
296+
*this.status = Some(status.status());
297+
return Poll::Ready(None);
298+
}
332299
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
333300
}
334301
}
@@ -342,4 +309,11 @@ impl RowStream {
342309
pub fn command_tag(&self) -> Option<String> {
343310
self.command_tag.clone()
344311
}
312+
313+
/// Returns if the connection is ready for querying, with the status of the connection.
314+
///
315+
/// This might be available only after the stream has been exhausted.
316+
pub fn ready_status(&self) -> Option<u8> {
317+
self.status
318+
}
345319
}

tokio-postgres/src/row.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ where
9898
/// A row of data returned from the database by a query.
9999
pub struct Row {
100100
statement: Statement,
101+
output_format: Format,
101102
body: DataRowBody,
102103
ranges: Vec<Option<Range<usize>>>,
103104
}
@@ -111,12 +112,17 @@ impl fmt::Debug for Row {
111112
}
112113

113114
impl Row {
114-
pub(crate) fn new(statement: Statement, body: DataRowBody) -> Result<Row, Error> {
115+
pub(crate) fn new(
116+
statement: Statement,
117+
body: DataRowBody,
118+
output_format: Format,
119+
) -> Result<Row, Error> {
115120
let ranges = body.ranges().collect().map_err(Error::parse)?;
116121
Ok(Row {
117122
statement,
118123
body,
119124
ranges,
125+
output_format,
120126
})
121127
}
122128

@@ -193,7 +199,7 @@ impl Row {
193199
///
194200
/// Useful when using query_raw_txt() which sets text transfer mode
195201
pub fn as_text(&self, idx: usize) -> Result<Option<&str>, Error> {
196-
if self.statement.output_format() == Format::Text {
202+
if self.output_format == Format::Text {
197203
match self.col_buffer(idx) {
198204
Some(raw) => {
199205
FromSql::from_sql(&Type::TEXT, raw).map_err(|e| Error::from_sql(e, idx))

tokio-postgres/src/statement.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use postgres_protocol::{
66
message::{backend::Field, frontend},
77
Oid,
88
};
9-
use postgres_types::Format;
109
use std::{
1110
fmt,
1211
sync::{Arc, Weak},
@@ -17,7 +16,6 @@ struct StatementInner {
1716
name: String,
1817
params: Vec<Type>,
1918
columns: Vec<Column>,
20-
output_format: Format,
2119
}
2220

2321
impl Drop for StatementInner {
@@ -51,22 +49,6 @@ impl Statement {
5149
name,
5250
params,
5351
columns,
54-
output_format: Format::Binary,
55-
}))
56-
}
57-
58-
pub(crate) fn new_text(
59-
inner: &Arc<InnerClient>,
60-
name: String,
61-
params: Vec<Type>,
62-
columns: Vec<Column>,
63-
) -> Statement {
64-
Statement(Arc::new(StatementInner {
65-
client: Arc::downgrade(inner),
66-
name,
67-
params,
68-
columns,
69-
output_format: Format::Text,
7052
}))
7153
}
7254

@@ -83,11 +65,6 @@ impl Statement {
8365
pub fn columns(&self) -> &[Column] {
8466
&self.0.columns
8567
}
86-
87-
/// Returns output format for the statement.
88-
pub fn output_format(&self) -> Format {
89-
self.0.output_format
90-
}
9168
}
9269

9370
/// Information about a column of a query.

tokio-postgres/src/transaction.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,14 @@ impl<'a> Transaction<'a> {
150150
}
151151

152152
/// Like `Client::query_raw_txt`.
153-
pub async fn query_raw_txt<S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
153+
pub async fn query_raw_txt<T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
154154
where
155-
S: AsRef<str> + Sync + Send,
155+
T: ?Sized + ToStatement,
156+
S: AsRef<str>,
156157
I: IntoIterator<Item = Option<S>>,
157-
I::IntoIter: ExactSizeIterator + Sync + Send,
158+
I::IntoIter: ExactSizeIterator,
158159
{
159-
self.client.query_raw_txt(query, params).await
160+
self.client.query_raw_txt(statement, params).await
160161
}
161162

162163
/// Like `Client::execute`.

0 commit comments

Comments
 (0)