Skip to content

Commit 041afd3

Browse files
h7kannaslinkydeveloper
authored andcommitted
Added replay aware tracing filter
1 parent c4243fe commit 041afd3

File tree

6 files changed

+199
-1
lines changed

6 files changed

+199
-1
lines changed

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,16 @@ license = "MIT"
77
repository = "https://github.com/restatedev/sdk-rust"
88
rust-version = "1.76.0"
99

10+
[[example]]
11+
name = "tracing"
12+
path = "examples/tracing.rs"
13+
required-features = ["tracing-subscriber"]
14+
1015
[features]
1116
default = ["http_server", "rand", "uuid"]
1217
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
1318
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
19+
tracing-subscriber = ["dep:tracing-subscriber"]
1420

1521

1622
[dependencies]
@@ -31,11 +37,12 @@ thiserror = "1.0.63"
3137
tokio = { version = "1", default-features = false, features = ["sync"] }
3238
tower-service = "0.3"
3339
tracing = "0.1"
40+
tracing-subscriber = { version = "0.3", features = ["registry"], optional = true }
3441
uuid = { version = "1.10.0", optional = true }
3542

3643
[dev-dependencies]
3744
tokio = { version = "1", features = ["full"] }
38-
tracing-subscriber = "0.3"
45+
tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] }
3946
trybuild = "1.0"
4047
reqwest = { version = "0.12", features = ["json"] }
4148
rand = "0.8.5"

examples/tracing.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use restate_sdk::prelude::*;
2+
use std::convert::Infallible;
3+
use std::time::Duration;
4+
use tracing::info;
5+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
6+
7+
#[restate_sdk::service]
8+
trait Greeter {
9+
async fn greet(name: String) -> Result<String, Infallible>;
10+
}
11+
12+
struct GreeterImpl;
13+
14+
impl Greeter for GreeterImpl {
15+
async fn greet(&self, ctx: Context<'_>, name: String) -> Result<String, Infallible> {
16+
let timeout = 60; // More than suspension timeout to trigger replay
17+
info!("This will be logged on replay");
18+
_ = ctx.service_client::<DelayerClient>().delay(1).call().await;
19+
info!("This will not be logged on replay");
20+
_ = ctx
21+
.service_client::<DelayerClient>()
22+
.delay(timeout)
23+
.call()
24+
.await;
25+
info!("This will be logged on processing after suspension");
26+
Ok(format!("Greetings {name} after {timeout} seconds"))
27+
}
28+
}
29+
30+
#[restate_sdk::service]
31+
trait Delayer {
32+
async fn delay(seconds: u64) -> Result<String, Infallible>;
33+
}
34+
35+
struct DelayerImpl;
36+
37+
impl Delayer for DelayerImpl {
38+
async fn delay(&self, ctx: Context<'_>, seconds: u64) -> Result<String, Infallible> {
39+
_ = ctx.sleep(Duration::from_secs(seconds)).await;
40+
info!("Delayed for {seconds} seconds");
41+
Ok(format!("Delayed {seconds}"))
42+
}
43+
}
44+
45+
#[tokio::main]
46+
async fn main() {
47+
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
48+
.unwrap_or_else(|_| "restate_sdk=info".into());
49+
let replay_filter = restate_sdk::filter::ReplayAwareFilter;
50+
tracing_subscriber::registry()
51+
.with(
52+
tracing_subscriber::fmt::layer()
53+
.with_filter(env_filter)
54+
.with_filter(replay_filter),
55+
)
56+
.init();
57+
HttpServer::new(
58+
Endpoint::builder()
59+
.bind(GreeterImpl.serve())
60+
.bind(DelayerImpl.serve())
61+
.build(),
62+
)
63+
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
64+
.await;
65+
}

src/endpoint/context.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ pub struct ContextInternalInner {
3030
pub(crate) read: InputReceiver,
3131
pub(crate) write: OutputSender,
3232
pub(super) handler_state: HandlerStateNotifier,
33+
// Flag to indicate whether span replay attribute should be set
34+
// When replaying this is set on the sys call
35+
// When not replaying this is reset on the sys call that transitioned the state
36+
pub(super) tracing_replaying_flag: bool,
3337
}
3438

3539
impl ContextInternalInner {
@@ -44,6 +48,7 @@ impl ContextInternalInner {
4448
read,
4549
write,
4650
handler_state,
51+
tracing_replaying_flag: true,
4752
}
4853
}
4954

@@ -55,6 +60,22 @@ impl ContextInternalInner {
5560
);
5661
self.handler_state.mark_error(e);
5762
}
63+
64+
pub(super) fn set_tracing_replaying_flag(&mut self) {
65+
if !self.vm.is_processing() {
66+
// Replay record is not yet set in the span
67+
if self.tracing_replaying_flag {
68+
tracing::Span::current().record("replaying", true);
69+
self.tracing_replaying_flag = false;
70+
}
71+
} else {
72+
// Replay record is not yet reset in the span
73+
if !self.tracing_replaying_flag {
74+
tracing::Span::current().record("replaying", false);
75+
self.tracing_replaying_flag = true;
76+
}
77+
}
78+
}
5879
}
5980

6081
/// Internal context interface.

src/endpoint/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use std::future::poll_fn;
1818
use std::pin::Pin;
1919
use std::sync::Arc;
2020
use std::task::{Context, Poll};
21+
#[cfg(feature = "tracing-subscriber")]
22+
use tracing::{field, info_span, Instrument};
2123

2224
const DISCOVERY_CONTENT_TYPE: &str = "application/vnd.restate.endpointmanifest.v1+json";
2325

@@ -368,6 +370,27 @@ impl BidiStreamRunner {
368370
.get(&self.svc_name)
369371
.expect("service must exist at this point");
370372

373+
#[cfg(feature = "tracing-subscriber")]
374+
{
375+
let span = info_span!(
376+
"handle",
377+
"rpc.system" = "restate",
378+
"rpc.service" = self.svc_name,
379+
"rpc.method" = self.handler_name,
380+
"replaying" = field::Empty,
381+
);
382+
handle(
383+
input_rx,
384+
output_tx,
385+
self.vm,
386+
self.svc_name,
387+
self.handler_name,
388+
svc,
389+
)
390+
.instrument(span)
391+
.await
392+
}
393+
#[cfg(not(feature = "tracing-subscriber"))]
371394
handle(
372395
input_rx,
373396
output_tx,

src/filter.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//! Replay aware tracing filter
2+
//!
3+
//! Use this filter to skip tracing events in the service/workflow while replaying.
4+
//!
5+
//! Example:
6+
//! ```rust,no_run
7+
//! use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
8+
//! let replay_filter = restate_sdk::filter::ReplayAwareFilter;
9+
//! tracing_subscriber::registry()
10+
//! .with(tracing_subscriber::fmt::layer().with_filter(replay_filter))
11+
//! .init();
12+
//! ```
13+
use std::fmt::Debug;
14+
use tracing::{
15+
field::{Field, Visit},
16+
span::{Attributes, Record},
17+
Event, Id, Metadata, Subscriber,
18+
};
19+
use tracing_subscriber::{
20+
layer::{Context, Filter},
21+
registry::LookupSpan,
22+
Layer,
23+
};
24+
25+
#[derive(Debug)]
26+
struct ReplayField(bool);
27+
28+
struct ReplayFieldVisitor(bool);
29+
30+
impl Visit for ReplayFieldVisitor {
31+
fn record_bool(&mut self, field: &Field, value: bool) {
32+
if field.name().eq("replaying") {
33+
self.0 = value;
34+
}
35+
}
36+
37+
fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {}
38+
}
39+
40+
pub struct ReplayAwareFilter;
41+
42+
impl<S: Subscriber + for<'lookup> LookupSpan<'lookup>> Filter<S> for ReplayAwareFilter {
43+
fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
44+
true
45+
}
46+
47+
fn event_enabled(&self, event: &Event<'_>, cx: &Context<'_, S>) -> bool {
48+
if let Some(scope) = cx.event_scope(event) {
49+
if let Some(span) = scope.from_root().next() {
50+
let extensions = span.extensions();
51+
if let Some(replay) = extensions.get::<ReplayField>() {
52+
return !replay.0;
53+
}
54+
}
55+
true
56+
} else {
57+
true
58+
}
59+
}
60+
61+
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
62+
if let Some(span) = ctx.span(id) {
63+
let mut visitor = ReplayFieldVisitor(false);
64+
attrs.record(&mut visitor);
65+
let mut extensions = span.extensions_mut();
66+
extensions.insert::<ReplayField>(ReplayField(visitor.0));
67+
}
68+
}
69+
70+
fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
71+
if let Some(span) = ctx.span(id) {
72+
let mut visitor = ReplayFieldVisitor(false);
73+
values.record(&mut visitor);
74+
let mut extensions = span.extensions_mut();
75+
extensions.replace::<ReplayField>(ReplayField(visitor.0));
76+
}
77+
}
78+
}
79+
80+
impl<S: Subscriber> Layer<S> for ReplayAwareFilter {}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ pub mod errors;
222222
pub mod http_server;
223223
#[cfg(feature = "hyper")]
224224
pub mod hyper;
225+
#[cfg(feature = "tracing-subscriber")]
226+
pub mod filter;
225227
pub mod serde;
226228

227229
/// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1).

0 commit comments

Comments
 (0)