Skip to content

Commit 0a83179

Browse files
committed
add tracing support
1 parent d451465 commit 0a83179

File tree

8 files changed

+189
-28
lines changed

8 files changed

+189
-28
lines changed

tokio-postgres/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ postgres-protocol = { version = "0.6.5", path = "../postgres-protocol" }
5858
postgres-types = { version = "0.2.4", path = "../postgres-types" }
5959
tokio = { version = "1.27", features = ["io-util"] }
6060
tokio-util = { version = "0.7", features = ["codec"] }
61+
tracing = "0.1"
6162
rand = "0.8.5"
6263

6364
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]

tokio-postgres/src/client.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ use crate::simple_query::SimpleQueryStream;
99
#[cfg(feature = "runtime")]
1010
use crate::tls::MakeTlsConnect;
1111
use crate::tls::TlsConnect;
12+
use crate::trace::make_span;
1213
use crate::types::{Oid, ToSql, Type};
1314
#[cfg(feature = "runtime")]
1415
use crate::Socket;
1516
use crate::{
16-
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
17-
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
17+
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, Config, CopyInSink,
18+
Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
1819
};
1920
use bytes::{Buf, BytesMut};
2021
use fallible_iterator::FallibleIterator;
@@ -34,6 +35,7 @@ use std::task::{Context, Poll};
3435
#[cfg(feature = "runtime")]
3536
use std::time::Duration;
3637
use tokio::io::{AsyncRead, AsyncWrite};
38+
use tracing::Instrument;
3739

3840
pub struct Responses {
3941
receiver: mpsc::Receiver<BackendMessages>,
@@ -85,6 +87,11 @@ pub struct InnerClient {
8587
sender: mpsc::UnboundedSender<Request>,
8688
cached_typeinfo: Mutex<CachedTypeInfo>,
8789

90+
db_user: String,
91+
db_name: String,
92+
#[cfg(feature = "runtime")]
93+
socket_config: Option<SocketConfig>,
94+
8895
/// A buffer to use when writing out postgres commands.
8996
buffer: Mutex<BytesMut>,
9097
}
@@ -103,6 +110,18 @@ impl InnerClient {
103110
})
104111
}
105112

113+
pub fn db_user(&self) -> &str {
114+
&self.db_user
115+
}
116+
117+
pub fn db_name(&self) -> &str {
118+
&self.db_name
119+
}
120+
121+
pub(crate) fn socket_config(&self) -> Option<&SocketConfig> {
122+
self.socket_config.as_ref()
123+
}
124+
106125
pub fn typeinfo(&self) -> Option<Statement> {
107126
self.cached_typeinfo.lock().typeinfo.clone()
108127
}
@@ -190,11 +209,16 @@ impl Client {
190209
ssl_mode: SslMode,
191210
process_id: i32,
192211
secret_key: i32,
212+
config: &Config,
193213
) -> Client {
194214
Client {
195215
inner: Arc::new(InnerClient {
196216
sender,
197217
cached_typeinfo: Default::default(),
218+
db_user: config.user.clone().unwrap_or_default(),
219+
db_name: config.dbname.clone().unwrap_or_default(),
220+
#[cfg(feature = "runtime")]
221+
socket_config: None,
198222
buffer: Default::default(),
199223
}),
200224
#[cfg(feature = "runtime")]
@@ -211,6 +235,9 @@ impl Client {
211235

212236
#[cfg(feature = "runtime")]
213237
pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
238+
if let Some(inner) = Arc::get_mut(&mut self.inner) {
239+
inner.socket_config = Some(socket_config.clone());
240+
}
214241
self.socket_config = Some(socket_config);
215242
}
216243

@@ -231,7 +258,12 @@ impl Client {
231258
query: &str,
232259
parameter_types: &[Type],
233260
) -> Result<Statement, Error> {
234-
prepare::prepare(&self.inner, query, parameter_types).await
261+
let span = make_span(&self.inner);
262+
span.record("db.operation", "prepare");
263+
264+
prepare::prepare(&self.inner, query, parameter_types)
265+
.instrument(span)
266+
.await
235267
}
236268

237269
/// Executes a statement, returning a vector of the resulting rows.

tokio-postgres/src/connect_raw.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ where
101101
let (process_id, secret_key, parameters) = read_info(&mut stream).await?;
102102

103103
let (sender, receiver) = mpsc::unbounded();
104-
let client = Client::new(sender, config.ssl_mode, process_id, secret_key);
104+
let client = Client::new(sender, config.ssl_mode, process_id, secret_key, config);
105105
let connection = Connection::new(stream.inner, stream.delayed, parameters, receiver);
106106

107107
Ok((client, connection))

tokio-postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ mod socket;
176176
mod statement;
177177
pub mod tls;
178178
mod to_statement;
179+
mod trace;
179180
mod transaction;
180181
mod transaction_builder;
181182
pub mod types;

tokio-postgres/src/prepare.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::client::InnerClient;
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::error::SqlState;
5+
use crate::trace::make_span;
56
use crate::types::{Field, Kind, Oid, Type};
67
use crate::{query, slice_iter};
78
use crate::{Column, Error, Statement};
@@ -15,6 +16,7 @@ use std::future::Future;
1516
use std::pin::Pin;
1617
use std::sync::atomic::{AtomicUsize, Ordering};
1718
use std::sync::Arc;
19+
use tracing::Instrument;
1820

1921
const TYPEINFO_QUERY: &str = "\
2022
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
@@ -100,15 +102,23 @@ pub async fn prepare(
100102
}
101103
}
102104

103-
Ok(Statement::new(client, name, parameters, columns))
105+
Ok(Statement::new(
106+
client,
107+
name,
108+
query.to_string(),
109+
parameters,
110+
columns,
111+
))
104112
}
105113

106114
fn prepare_rec<'a>(
107115
client: &'a Arc<InnerClient>,
108116
query: &'a str,
109117
types: &'a [Type],
110118
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
111-
Box::pin(prepare(client, query, types))
119+
let span = make_span(client);
120+
span.record("db.operation", "prepare");
121+
Box::pin(prepare(client, query, types).instrument(span))
112122
}
113123

114124
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {

tokio-postgres/src/query.rs

Lines changed: 80 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4+
use crate::trace::make_span;
45
use crate::types::{BorrowToSql, IsNull};
56
use crate::{Error, Portal, Row, Statement};
67
use bytes::{Bytes, BytesMut};
@@ -13,6 +14,7 @@ use std::fmt;
1314
use std::marker::PhantomPinned;
1415
use std::pin::Pin;
1516
use std::task::{Context, Poll};
17+
use tracing::{Instrument, Span};
1618

1719
struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);
1820

@@ -27,30 +29,74 @@ where
2729
}
2830
}
2931

30-
pub async fn query<P, I>(
32+
fn encode_with_logs<P, I>(
3133
client: &InnerClient,
32-
statement: Statement,
34+
span: &Span,
35+
statement: &Statement,
3336
params: I,
34-
) -> Result<RowStream, Error>
37+
) -> Result<Bytes, Error>
3538
where
3639
P: BorrowToSql,
3740
I: IntoIterator<Item = P>,
3841
I::IntoIter: ExactSizeIterator,
3942
{
40-
let buf = if log_enabled!(Level::Debug) {
43+
if log_enabled!(Level::Debug) || !span.is_disabled() {
4144
let params = params.into_iter().collect::<Vec<_>>();
4245
debug!(
4346
"executing statement {} with parameters: {:?}",
4447
statement.name(),
4548
BorrowToSqlParamsDebug(params.as_slice()),
4649
);
47-
encode(client, &statement, params)?
50+
if !span.is_disabled() {
51+
span.record("db.statement", statement.query());
52+
53+
// while OTEL supports arrays, we don't want to add that as a dependency, so debug view it is.
54+
let raw_params = BorrowToSqlParamsDebug(params.as_slice());
55+
span.record("db.statement.params", format!("{raw_params:?}"));
56+
}
57+
encode(client, statement, params)
4858
} else {
49-
encode(client, &statement, params)?
50-
};
51-
let responses = start(client, buf).await?;
59+
encode(client, statement, params)
60+
}
61+
}
62+
63+
async fn start_traced(client: &InnerClient, span: &Span, buf: Bytes) -> Result<Responses, Error> {
64+
match start(client, buf).instrument(span.clone()).await {
65+
Ok(response) => {
66+
span.record("otel.status_code", "OK");
67+
Ok(response)
68+
}
69+
Err(e) => {
70+
span.record("otel.status_code", "ERROR");
71+
span.record("exception.message", e.to_string());
72+
Err(e)
73+
}
74+
}
75+
}
76+
77+
pub async fn query<P, I>(
78+
client: &InnerClient,
79+
statement: Statement,
80+
params: I,
81+
) -> Result<RowStream, Error>
82+
where
83+
P: BorrowToSql,
84+
I: IntoIterator<Item = P>,
85+
I::IntoIter: ExactSizeIterator,
86+
{
87+
let span = make_span(client);
88+
span.record("db.operation", "query");
89+
90+
let buf = encode_with_logs(client, &span, &statement, params)?;
91+
92+
let responses = start_traced(client, &span, buf).await?;
93+
94+
span.in_scope(|| {
95+
tracing::trace!("response ready");
96+
});
5297
Ok(RowStream {
5398
statement,
99+
span,
54100
responses,
55101
rows_affected: None,
56102
_p: PhantomPinned,
@@ -62,6 +108,10 @@ pub async fn query_portal(
62108
portal: &Portal,
63109
max_rows: i32,
64110
) -> Result<RowStream, Error> {
111+
let span = make_span(client);
112+
span.record("db.statement", portal.statement().query());
113+
span.record("db.operation", "portal");
114+
65115
let buf = client.with_buf(|buf| {
66116
frontend::execute(portal.name(), max_rows, buf).map_err(Error::encode)?;
67117
frontend::sync(buf);
@@ -72,6 +122,7 @@ pub async fn query_portal(
72122

73123
Ok(RowStream {
74124
statement: portal.statement().clone(),
125+
span,
75126
responses,
76127
rows_affected: None,
77128
_p: PhantomPinned,
@@ -101,25 +152,25 @@ where
101152
I: IntoIterator<Item = P>,
102153
I::IntoIter: ExactSizeIterator,
103154
{
104-
let buf = if log_enabled!(Level::Debug) {
105-
let params = params.into_iter().collect::<Vec<_>>();
106-
debug!(
107-
"executing statement {} with parameters: {:?}",
108-
statement.name(),
109-
BorrowToSqlParamsDebug(params.as_slice()),
110-
);
111-
encode(client, &statement, params)?
112-
} else {
113-
encode(client, &statement, params)?
114-
};
115-
let mut responses = start(client, buf).await?;
155+
let span = make_span(client);
156+
span.record("db.operation", "execute");
157+
158+
let buf = encode_with_logs(client, &span, &statement, params)?;
159+
160+
let mut responses = start_traced(client, &span, buf).await?;
161+
162+
span.in_scope(|| {
163+
tracing::trace!("response ready");
164+
});
116165

117166
let mut rows = 0;
118167
loop {
119-
match responses.next().await? {
168+
match responses.next().instrument(span.clone()).await? {
120169
Message::DataRow(_) => {}
121170
Message::CommandComplete(body) => {
122171
rows = extract_row_affected(&body)?;
172+
span.record("db.sql.rows_affected", rows);
173+
tracing::trace!("execute complete");
123174
}
124175
Message::EmptyQueryResponse => rows = 0,
125176
Message::ReadyForQuery(_) => return Ok(rows),
@@ -206,6 +257,7 @@ pin_project! {
206257
/// A stream of table rows.
207258
pub struct RowStream {
208259
statement: Statement,
260+
span: Span,
209261
responses: Responses,
210262
rows_affected: Option<u64>,
211263
#[pin]
@@ -218,13 +270,19 @@ impl Stream for RowStream {
218270

219271
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220272
let this = self.project();
273+
let _span = this.span.enter();
221274
loop {
222275
match ready!(this.responses.poll_next(cx)?) {
223276
Message::DataRow(body) => {
224277
return Poll::Ready(Some(Ok(Row::new(this.statement.clone(), body)?)))
225278
}
226279
Message::CommandComplete(body) => {
227-
*this.rows_affected = Some(extract_row_affected(&body)?);
280+
let rows_affected = extract_row_affected(&body)?;
281+
282+
this.span.record("db.sql.rows_affected", rows_affected);
283+
tracing::trace!("query complete");
284+
285+
*this.rows_affected = Some(rows_affected);
228286
}
229287
Message::EmptyQueryResponse | Message::PortalSuspended => {}
230288
Message::ReadyForQuery(_) => return Poll::Ready(None),

tokio-postgres/src/statement.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::{
1111
struct StatementInner {
1212
client: Weak<InnerClient>,
1313
name: String,
14+
query: String,
1415
params: Vec<Type>,
1516
columns: Vec<Column>,
1617
}
@@ -38,12 +39,14 @@ impl Statement {
3839
pub(crate) fn new(
3940
inner: &Arc<InnerClient>,
4041
name: String,
42+
query: String,
4143
params: Vec<Type>,
4244
columns: Vec<Column>,
4345
) -> Statement {
4446
Statement(Arc::new(StatementInner {
4547
client: Arc::downgrade(inner),
4648
name,
49+
query,
4750
params,
4851
columns,
4952
}))
@@ -53,6 +56,11 @@ impl Statement {
5356
&self.0.name
5457
}
5558

59+
/// Returns the query that was used to create this statement.
60+
pub fn query(&self) -> &str {
61+
&self.0.query
62+
}
63+
5664
/// Returns the expected types of the statement's parameters.
5765
pub fn params(&self) -> &[Type] {
5866
&self.0.params

0 commit comments

Comments
 (0)