Skip to content

Commit 54c6172

Browse files
committed
Added replay aware tracing filter
1 parent 4c828ed commit 54c6172

File tree

7 files changed

+207
-3
lines changed

7 files changed

+207
-3
lines changed

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,16 @@ license = "MIT"
77
repository = "https://github.com/restatedev/sdk-rust"
88
rust-version = "1.76.0"
99

10+
[[example]]
11+
name = "tracing"
12+
path = "examples/tracing.rs"
13+
required-features = ["tracing-subscriber"]
14+
1015
[features]
1116
default = ["http_server", "rand", "uuid"]
1217
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
1318
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
19+
tracing-subscriber = ["dep:tracing-subscriber"]
1420

1521
[dependencies]
1622
bytes = "1.6.1"
@@ -30,11 +36,12 @@ thiserror = "1.0.63"
3036
tokio = { version = "1", default-features = false, features = ["sync"] }
3137
tower-service = "0.3"
3238
tracing = "0.1"
39+
tracing-subscriber = { version = "0.3", features = ["registry"], optional = true }
3340
uuid = { version = "1.10.0", optional = true }
3441

3542
[dev-dependencies]
3643
tokio = { version = "1", features = ["full"] }
37-
tracing-subscriber = "0.3"
44+
tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] }
3845
trybuild = "1.0"
3946
reqwest = { version = "0.12", features = ["json"] }
4047
rand = "0.8.5"

examples/tracing.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use restate_sdk::prelude::*;
2+
use std::convert::Infallible;
3+
use std::time::Duration;
4+
use tracing::info;
5+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
6+
7+
#[restate_sdk::service]
8+
trait Greeter {
9+
async fn greet(name: String) -> Result<String, Infallible>;
10+
}
11+
12+
struct GreeterImpl;
13+
14+
impl Greeter for GreeterImpl {
15+
async fn greet(&self, ctx: Context<'_>, name: String) -> Result<String, Infallible> {
16+
let timeout = 60; // More than suspension timeout to trigger replay
17+
info!("This will be logged on replay");
18+
_ = ctx.service_client::<DelayerClient>().delay(1).call().await;
19+
info!("This will not be logged on replay");
20+
_ = ctx
21+
.service_client::<DelayerClient>()
22+
.delay(timeout)
23+
.call()
24+
.await;
25+
info!("This will be logged on processing after suspension");
26+
Ok(format!("Greetings {name} after {timeout} seconds"))
27+
}
28+
}
29+
30+
#[restate_sdk::service]
31+
trait Delayer {
32+
async fn delay(seconds: u64) -> Result<String, Infallible>;
33+
}
34+
35+
struct DelayerImpl;
36+
37+
impl Delayer for DelayerImpl {
38+
async fn delay(&self, ctx: Context<'_>, seconds: u64) -> Result<String, Infallible> {
39+
_ = ctx.sleep(Duration::from_secs(seconds)).await;
40+
info!("Delayed for {seconds} seconds");
41+
Ok(format!("Delayed {seconds}"))
42+
}
43+
}
44+
45+
#[tokio::main]
46+
async fn main() {
47+
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
48+
.unwrap_or_else(|_| "restate_sdk=info".into());
49+
let replay_filter = restate_sdk::filter::ReplayAwareFilter;
50+
tracing_subscriber::registry()
51+
.with(
52+
tracing_subscriber::fmt::layer()
53+
.with_filter(env_filter)
54+
.with_filter(replay_filter),
55+
)
56+
.init();
57+
HttpServer::new(
58+
Endpoint::builder()
59+
.bind(GreeterImpl.serve())
60+
.bind(DelayerImpl.serve())
61+
.build(),
62+
)
63+
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
64+
.await;
65+
}

src/endpoint/context.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ pub struct ContextInternalInner {
2828
pub(crate) read: InputReceiver,
2929
pub(crate) write: OutputSender,
3030
pub(super) handler_state: HandlerStateNotifier,
31+
// Flag to indicate whether span replay attribute should be set
32+
// When replaying this is set on the sys call
33+
// When not replaying this is reset on the sys call that transitioned the state
34+
pub(super) tracing_replaying_flag: bool,
3135
}
3236

3337
impl ContextInternalInner {
@@ -42,6 +46,7 @@ impl ContextInternalInner {
4246
read,
4347
write,
4448
handler_state,
49+
tracing_replaying_flag: true,
4550
}
4651
}
4752

@@ -50,6 +55,22 @@ impl ContextInternalInner {
5055
.notify_error(e.0.to_string().into(), format!("{:#}", e.0).into(), None);
5156
self.handler_state.mark_error(e);
5257
}
58+
59+
pub(super) fn set_tracing_replaying_flag(&mut self) {
60+
if !self.vm.is_processing() {
61+
// Replay record is not yet set in the span
62+
if self.tracing_replaying_flag {
63+
tracing::Span::current().record("replaying", true);
64+
self.tracing_replaying_flag = false;
65+
}
66+
} else {
67+
// Replay record is not yet reset in the span
68+
if !self.tracing_replaying_flag {
69+
tracing::Span::current().record("replaying", false);
70+
self.tracing_replaying_flag = true;
71+
}
72+
}
73+
}
5374
}
5475

5576
/// Internal context interface.

src/endpoint/futures/async_result_poll.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,10 @@ impl Future for VmAsyncResultPollFuture {
8383

8484
// At this point let's try to take the async result
8585
match inner_lock.vm.take_async_result(handle) {
86-
Ok(Some(v)) => return Poll::Ready(Ok(v)),
86+
Ok(Some(v)) => {
87+
inner_lock.set_tracing_replaying_flag();
88+
return Poll::Ready(Ok(v));
89+
}
8790
Ok(None) => {
8891
drop(inner_lock);
8992
self.state = Some(PollState::WaitingInput { ctx, handle });
@@ -121,7 +124,10 @@ impl Future for VmAsyncResultPollFuture {
121124

122125
// Now try to take async result again
123126
match inner_lock.vm.take_async_result(handle) {
124-
Ok(Some(v)) => return Poll::Ready(Ok(v)),
127+
Ok(Some(v)) => {
128+
inner_lock.set_tracing_replaying_flag();
129+
return Poll::Ready(Ok(v));
130+
}
125131
Ok(None) => {
126132
drop(inner_lock);
127133
self.state = Some(PollState::WaitingInput { ctx, handle });

src/endpoint/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use std::future::poll_fn;
1818
use std::pin::Pin;
1919
use std::sync::Arc;
2020
use std::task::{Context, Poll};
21+
#[cfg(feature = "tracing-subscriber")]
22+
use tracing::{field, info_span, Instrument};
2123

2224
const DISCOVERY_CONTENT_TYPE: &str = "application/vnd.restate.endpointmanifest.v1+json";
2325

@@ -344,6 +346,27 @@ impl BidiStreamRunner {
344346
.get(&self.svc_name)
345347
.expect("service must exist at this point");
346348

349+
#[cfg(feature = "tracing-subscriber")]
350+
{
351+
let span = info_span!(
352+
"handle",
353+
"rpc.system" = "restate",
354+
"rpc.service" = self.svc_name,
355+
"rpc.method" = self.handler_name,
356+
"replaying" = field::Empty,
357+
);
358+
handle(
359+
input_rx,
360+
output_tx,
361+
self.vm,
362+
self.svc_name,
363+
self.handler_name,
364+
svc,
365+
)
366+
.instrument(span)
367+
.await
368+
}
369+
#[cfg(not(feature = "tracing-subscriber"))]
347370
handle(
348371
input_rx,
349372
output_tx,

src/filter.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//! Replay aware tracing filter
2+
//!
3+
//! Use this filter to skip tracing events in the service/workflow while replaying.
4+
//!
5+
//! Example:
6+
//! ```rust,no_run
7+
//! use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
8+
//! let replay_filter = restate_sdk::filter::ReplayAwareFilter;
9+
//! tracing_subscriber::registry()
10+
//! .with(tracing_subscriber::fmt::layer().with_filter(replay_filter))
11+
//! .init();
12+
//! ```
13+
use std::fmt::Debug;
14+
use tracing::{
15+
field::{Field, Visit},
16+
span::{Attributes, Record},
17+
Event, Id, Metadata, Subscriber,
18+
};
19+
use tracing_subscriber::{
20+
layer::{Context, Filter},
21+
registry::LookupSpan,
22+
Layer,
23+
};
24+
25+
#[derive(Debug)]
26+
struct ReplayField(bool);
27+
28+
struct ReplayFieldVisitor(bool);
29+
30+
impl Visit for ReplayFieldVisitor {
31+
fn record_bool(&mut self, field: &Field, value: bool) {
32+
if field.name().eq("replaying") {
33+
self.0 = value;
34+
}
35+
}
36+
37+
fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {}
38+
}
39+
40+
pub struct ReplayAwareFilter;
41+
42+
impl<S: Subscriber + for<'lookup> LookupSpan<'lookup>> Filter<S> for ReplayAwareFilter {
43+
fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
44+
true
45+
}
46+
47+
fn event_enabled(&self, event: &Event<'_>, cx: &Context<'_, S>) -> bool {
48+
if let Some(scope) = cx.event_scope(event) {
49+
if let Some(span) = scope.from_root().next() {
50+
let extensions = span.extensions();
51+
if let Some(replay) = extensions.get::<ReplayField>() {
52+
return !replay.0;
53+
}
54+
}
55+
true
56+
} else {
57+
true
58+
}
59+
}
60+
61+
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
62+
if let Some(span) = ctx.span(id) {
63+
let mut visitor = ReplayFieldVisitor(false);
64+
attrs.record(&mut visitor);
65+
let mut extensions = span.extensions_mut();
66+
extensions.insert::<ReplayField>(ReplayField(visitor.0));
67+
}
68+
}
69+
70+
fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
71+
if let Some(span) = ctx.span(id) {
72+
let mut visitor = ReplayFieldVisitor(false);
73+
values.record(&mut visitor);
74+
let mut extensions = span.extensions_mut();
75+
extensions.replace::<ReplayField>(ReplayField(visitor.0));
76+
}
77+
}
78+
}
79+
80+
impl<S: Subscriber> Layer<S> for ReplayAwareFilter {}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub mod errors;
4646
pub mod http_server;
4747
#[cfg(feature = "hyper")]
4848
pub mod hyper;
49+
#[cfg(feature = "tracing-subscriber")]
50+
pub mod filter;
4951
pub mod serde;
5052

5153
/// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1).

0 commit comments

Comments
 (0)