Skip to content

Commit 7f219c7

Browse files
Few changes to the replay aware filter feature:
* We now emit the span all the times for the endpoint handle method. This seems a reasonable default, and doesn't require the tracing-subscriber dependency (we already pull in tracing as dependency anyway) * Renamed the span name and the replaying field filter, to avoid confusion and clash * Now the replay aware filter will look for the exact span name created by the handler. This should make the matching of the exact span we wanna use for filtering more robust. * Flip the replaying field not only when taking an awaiting, but also when executing `sys_` functions on the state machine. Those are the ones that can cause state transitions. * Simplify the tracing example
1 parent 041afd3 commit 7f219c7

File tree

7 files changed

+100
-116
lines changed

7 files changed

+100
-116
lines changed

Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@ rust-version = "1.76.0"
1010
[[example]]
1111
name = "tracing"
1212
path = "examples/tracing.rs"
13-
required-features = ["tracing-subscriber"]
13+
required-features = ["span-filter"]
1414

1515
[features]
16-
default = ["http_server", "rand", "uuid"]
16+
default = ["http_server", "rand", "uuid", "span-filter"]
1717
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
1818
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
19-
tracing-subscriber = ["dep:tracing-subscriber"]
20-
19+
span-filter = ["dep:tracing-subscriber"]
2120

2221
[dependencies]
2322
bytes = "1.6.1"

examples/tracing.rs

Lines changed: 10 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,28 @@
11
use restate_sdk::prelude::*;
2-
use std::convert::Infallible;
32
use std::time::Duration;
43
use tracing::info;
54
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
65

76
#[restate_sdk::service]
87
trait Greeter {
9-
async fn greet(name: String) -> Result<String, Infallible>;
8+
async fn greet(name: String) -> Result<String, HandlerError>;
109
}
1110

1211
struct GreeterImpl;
1312

1413
impl Greeter for GreeterImpl {
15-
async fn greet(&self, ctx: Context<'_>, name: String) -> Result<String, Infallible> {
16-
let timeout = 60; // More than suspension timeout to trigger replay
17-
info!("This will be logged on replay");
18-
_ = ctx.service_client::<DelayerClient>().delay(1).call().await;
19-
info!("This will not be logged on replay");
20-
_ = ctx
21-
.service_client::<DelayerClient>()
22-
.delay(timeout)
23-
.call()
24-
.await;
25-
info!("This will be logged on processing after suspension");
26-
Ok(format!("Greetings {name} after {timeout} seconds"))
27-
}
28-
}
29-
30-
#[restate_sdk::service]
31-
trait Delayer {
32-
async fn delay(seconds: u64) -> Result<String, Infallible>;
33-
}
34-
35-
struct DelayerImpl;
36-
37-
impl Delayer for DelayerImpl {
38-
async fn delay(&self, ctx: Context<'_>, seconds: u64) -> Result<String, Infallible> {
39-
_ = ctx.sleep(Duration::from_secs(seconds)).await;
40-
info!("Delayed for {seconds} seconds");
41-
Ok(format!("Delayed {seconds}"))
14+
async fn greet(&self, ctx: Context<'_>, name: String) -> Result<String, HandlerError> {
15+
info!("Before sleep");
16+
ctx.sleep(Duration::from_secs(61)).await?; // More than suspension timeout to trigger replay
17+
info!("After sleep");
18+
Ok(format!("Greetings {name}"))
4219
}
4320
}
4421

4522
#[tokio::main]
4623
async fn main() {
4724
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
48-
.unwrap_or_else(|_| "restate_sdk=info".into());
25+
.unwrap_or_else(|_| "info,restate_sdk=debug".into());
4926
let replay_filter = restate_sdk::filter::ReplayAwareFilter;
5027
tracing_subscriber::registry()
5128
.with(
@@ -54,12 +31,7 @@ async fn main() {
5431
.with_filter(replay_filter),
5532
)
5633
.init();
57-
HttpServer::new(
58-
Endpoint::builder()
59-
.bind(GreeterImpl.serve())
60-
.bind(DelayerImpl.serve())
61-
.build(),
62-
)
63-
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
64-
.await;
34+
HttpServer::new(Endpoint::builder().bind(GreeterImpl.serve()).build())
35+
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
36+
.await;
6537
}

src/endpoint/context.rs

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ pub struct ContextInternalInner {
3030
pub(crate) read: InputReceiver,
3131
pub(crate) write: OutputSender,
3232
pub(super) handler_state: HandlerStateNotifier,
33-
// Flag to indicate whether span replay attribute should be set
34-
// When replaying this is set on the sys call
35-
// When not replaying this is reset on the sys call that transitioned the state
36-
pub(super) tracing_replaying_flag: bool,
33+
34+
/// We remember here the state of the span replaying field state, because setting it might be expensive (it's guarded behind locks and other stuff).
35+
/// For details, see [ContextInternalInner::maybe_flip_span_replaying_field]
36+
pub(super) span_replaying_field_state: bool,
3737
}
3838

3939
impl ContextInternalInner {
@@ -48,11 +48,12 @@ impl ContextInternalInner {
4848
read,
4949
write,
5050
handler_state,
51-
tracing_replaying_flag: true,
51+
span_replaying_field_state: false,
5252
}
5353
}
5454

5555
pub(super) fn fail(&mut self, e: Error) {
56+
self.maybe_flip_span_replaying_field();
5657
self.vm.notify_error(
5758
CoreError::new(500u16, e.0.to_string())
5859
.with_stacktrace(Cow::Owned(format!("{:#}", e.0))),
@@ -61,19 +62,13 @@ impl ContextInternalInner {
6162
self.handler_state.mark_error(e);
6263
}
6364

64-
pub(super) fn set_tracing_replaying_flag(&mut self) {
65-
if !self.vm.is_processing() {
66-
// Replay record is not yet set in the span
67-
if self.tracing_replaying_flag {
68-
tracing::Span::current().record("replaying", true);
69-
self.tracing_replaying_flag = false;
70-
}
71-
} else {
72-
// Replay record is not yet reset in the span
73-
if !self.tracing_replaying_flag {
74-
tracing::Span::current().record("replaying", false);
75-
self.tracing_replaying_flag = true;
76-
}
65+
pub(super) fn maybe_flip_span_replaying_field(&mut self) {
66+
if !self.span_replaying_field_state && self.vm.is_replaying() {
67+
tracing::Span::current().record("restate.sdk.is_replaying", true);
68+
self.span_replaying_field_state = true;
69+
} else if self.span_replaying_field_state && !self.vm.is_replaying() {
70+
tracing::Span::current().record("restate.sdk.is_replaying", false);
71+
self.span_replaying_field_state = false;
7772
}
7873
}
7974
}
@@ -211,6 +206,7 @@ impl ContextInternal {
211206
},
212207
))
213208
});
209+
inner_lock.maybe_flip_span_replaying_field();
214210

215211
match input_result {
216212
Ok(Ok(i)) => {
@@ -244,6 +240,7 @@ impl ContextInternal {
244240
) -> impl Future<Output = Result<Option<T>, TerminalError>> + Send {
245241
let mut inner_lock = must_lock!(self.inner);
246242
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_state_get(key.to_owned()));
243+
inner_lock.maybe_flip_span_replaying_field();
247244

248245
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
249246
Ok(Value::Void) => Ok(Ok(None)),
@@ -267,6 +264,7 @@ impl ContextInternal {
267264
pub fn get_keys(&self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + Send {
268265
let mut inner_lock = must_lock!(self.inner);
269266
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_state_get_keys());
267+
inner_lock.maybe_flip_span_replaying_field();
270268

271269
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
272270
Ok(Value::Failure(f)) => Ok(Err(f.into())),
@@ -287,6 +285,7 @@ impl ContextInternal {
287285
match t.serialize() {
288286
Ok(b) => {
289287
let _ = inner_lock.vm.sys_state_set(key.to_owned(), b);
288+
inner_lock.maybe_flip_span_replaying_field();
290289
}
291290
Err(e) => {
292291
inner_lock.fail(Error::serialization("set_state", e));
@@ -295,11 +294,15 @@ impl ContextInternal {
295294
}
296295

297296
pub fn clear(&self, key: &str) {
298-
let _ = must_lock!(self.inner).vm.sys_state_clear(key.to_string());
297+
let mut inner_lock = must_lock!(self.inner);
298+
let _ = inner_lock.vm.sys_state_clear(key.to_string());
299+
inner_lock.maybe_flip_span_replaying_field();
299300
}
300301

301302
pub fn clear_all(&self) {
302-
let _ = must_lock!(self.inner).vm.sys_state_clear_all();
303+
let mut inner_lock = must_lock!(self.inner);
304+
let _ = inner_lock.vm.sys_state_clear_all();
305+
inner_lock.maybe_flip_span_replaying_field();
303306
}
304307

305308
pub fn sleep(
@@ -314,6 +317,7 @@ impl ContextInternal {
314317
inner_lock,
315318
inner_lock.vm.sys_sleep(now + sleep_duration, Some(now))
316319
);
320+
inner_lock.maybe_flip_span_replaying_field();
317321

318322
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
319323
Ok(Value::Void) => Ok(Ok(())),
@@ -349,6 +353,7 @@ impl ContextInternal {
349353
);
350354

351355
let call_handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_call(target, input));
356+
inner_lock.maybe_flip_span_replaying_field();
352357
drop(inner_lock);
353358

354359
// Let's prepare the two futures here
@@ -432,6 +437,7 @@ impl ContextInternal {
432437
return Either::Right(TrapFuture::<()>::default());
433438
}
434439
};
440+
inner_lock.maybe_flip_span_replaying_field();
435441
drop(inner_lock);
436442

437443
let invocation_id_fut = InterceptErrorFuture::new(
@@ -473,6 +479,7 @@ impl ContextInternal {
473479
) {
474480
let mut inner_lock = must_lock!(self.inner);
475481
let maybe_awakeable_id_and_handle = inner_lock.vm.sys_awakeable();
482+
inner_lock.maybe_flip_span_replaying_field();
476483

477484
let (awakeable_id, handle) = match maybe_awakeable_id_and_handle {
478485
Ok((s, handle)) => (s, handle),
@@ -533,6 +540,7 @@ impl ContextInternal {
533540
) -> impl Future<Output = Result<T, TerminalError>> + Send {
534541
let mut inner_lock = must_lock!(self.inner);
535542
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_get_promise(name.to_owned()));
543+
inner_lock.maybe_flip_span_replaying_field();
536544
drop(inner_lock);
537545

538546
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
@@ -558,6 +566,7 @@ impl ContextInternal {
558566
) -> impl Future<Output = Result<Option<T>, TerminalError>> + Send {
559567
let mut inner_lock = must_lock!(self.inner);
560568
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_peek_promise(name.to_owned()));
569+
inner_lock.maybe_flip_span_replaying_field();
561570
drop(inner_lock);
562571

563572
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
@@ -646,6 +655,7 @@ impl ContextInternal {
646655
};
647656

648657
let _ = inner_lock.vm.sys_write_output(res_to_write);
658+
inner_lock.maybe_flip_span_replaying_field();
649659
}
650660

651661
pub fn end(&self) {
@@ -880,6 +890,7 @@ impl<InvIdFut: Future<Output = Result<String, TerminalError>> + Send> Invocation
880890
let inv_id = cloned_invocation_id_fut.await?;
881891
let mut inner_lock = must_lock!(cloned_ctx);
882892
let _ = inner_lock.vm.sys_cancel_invocation(inv_id);
893+
inner_lock.maybe_flip_span_replaying_field();
883894
drop(inner_lock);
884895
Ok(())
885896
}
@@ -924,6 +935,7 @@ where
924935
let inv_id = cloned_invocation_id_fut.await?;
925936
let mut inner_lock = must_lock!(cloned_ctx);
926937
let _ = inner_lock.vm.sys_cancel_invocation(inv_id);
938+
inner_lock.maybe_flip_span_replaying_field();
927939
drop(inner_lock);
928940
Ok(())
929941
}

src/endpoint/futures/async_result_poll.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ impl Future for VmAsyncResultPollFuture {
128128
}
129129
};
130130

131+
// DoProgress might cause a flip of the replaying state
132+
inner_lock.maybe_flip_span_replaying_field();
133+
131134
// At this point let's try to take the notification
132135
match inner_lock.vm.take_notification(handle) {
133136
Ok(Some(v)) => return Poll::Ready(Ok(v)),

src/endpoint/mod.rs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ use std::future::poll_fn;
1818
use std::pin::Pin;
1919
use std::sync::Arc;
2020
use std::task::{Context, Poll};
21-
#[cfg(feature = "tracing-subscriber")]
22-
use tracing::{field, info_span, Instrument};
21+
use tracing::{info_span, Instrument};
2322

2423
const DISCOVERY_CONTENT_TYPE: &str = "application/vnd.restate.endpointmanifest.v1+json";
2524

@@ -370,27 +369,13 @@ impl BidiStreamRunner {
370369
.get(&self.svc_name)
371370
.expect("service must exist at this point");
372371

373-
#[cfg(feature = "tracing-subscriber")]
374-
{
375-
let span = info_span!(
376-
"handle",
377-
"rpc.system" = "restate",
378-
"rpc.service" = self.svc_name,
379-
"rpc.method" = self.handler_name,
380-
"replaying" = field::Empty,
381-
);
382-
handle(
383-
input_rx,
384-
output_tx,
385-
self.vm,
386-
self.svc_name,
387-
self.handler_name,
388-
svc,
389-
)
390-
.instrument(span)
391-
.await
392-
}
393-
#[cfg(not(feature = "tracing-subscriber"))]
372+
let span = info_span!(
373+
"restate_sdk_endpoint_handle",
374+
"rpc.system" = "restate",
375+
"rpc.service" = self.svc_name,
376+
"rpc.method" = self.handler_name,
377+
"restate.sdk.is_replaying" = false
378+
);
394379
handle(
395380
input_rx,
396381
output_tx,
@@ -399,6 +384,7 @@ impl BidiStreamRunner {
399384
self.handler_name,
400385
svc,
401386
)
387+
.instrument(span)
402388
.await
403389
}
404390
}

0 commit comments

Comments
 (0)