Skip to content

Commit 54d4e55

Browse files
Side effect retry (#20)
1 parent 0a337f3 commit 54d4e55

File tree

16 files changed

+741
-460
lines changed

16 files changed

+741
-460
lines changed

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jobs:
4141
name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})"
4242
strategy:
4343
matrix:
44-
sdk-test-suite: [ "1.8" ]
44+
sdk-test-suite: [ "2.0" ]
4545
permissions:
4646
contents: read
4747
issues: read

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pin-project-lite = "0.2"
2222
rand = { version = "0.8.5", optional = true }
2323
regress = "0.10"
2424
restate-sdk-macros = { version = "0.2.1", path = "macros" }
25-
restate-sdk-shared-core = { version = "0.0.5" }
25+
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", branch = "main" }
2626
serde = "1.0"
2727
serde_json = "1.0"
2828
thiserror = "1.0.63"

examples/failures.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ struct MyError;
1616
impl FailureExample for FailureExampleImpl {
1717
async fn do_run(&self, context: Context<'_>) -> HandlerResult<()> {
1818
context
19-
.run("get_ip", || async move {
19+
.run(|| async move {
2020
if rand::thread_rng().next_u32() % 4 == 0 {
2121
return Err(TerminalError::new("Failed!!!").into());
2222
}

examples/run.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ struct RunExampleImpl(reqwest::Client);
1111
impl RunExample for RunExampleImpl {
1212
async fn do_run(&self, context: Context<'_>) -> HandlerResult<Json<HashMap<String, String>>> {
1313
let res = context
14-
.run("get_ip", || async move {
14+
.run(|| async move {
1515
let req = self.0.get("https://httpbin.org/ip").build()?;
1616

1717
let res = self
@@ -23,6 +23,7 @@ impl RunExample for RunExampleImpl {
2323

2424
Ok(Json::from(res))
2525
})
26+
.named("get_ip")
2627
.await?
2728
.into_inner();
2829

src/context/mod.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ mod request;
1010
mod run;
1111

1212
pub use request::{Request, RequestTarget};
13-
pub use run::RunClosure;
13+
pub use run::{RunClosure, RunFuture, RunRetryPolicy};
14+
1415
pub type HeaderMap = http::HeaderMap<String>;
1516

1617
/// Service handler context.
@@ -371,18 +372,14 @@ impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
371372
/// Trait exposing Restate functionalities to deal with non-deterministic operations.
372373
pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
373374
/// Run a non-deterministic operation and record its result.
374-
///
375-
fn run<R, F, T>(
376-
&self,
377-
name: &'ctx str,
378-
run_closure: R,
379-
) -> impl Future<Output = Result<T, TerminalError>> + 'ctx
375+
#[must_use]
376+
fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
380377
where
381378
R: RunClosure<Fut = F, Output = T> + Send + Sync + 'ctx,
382-
T: Serialize + Deserialize,
383379
F: Future<Output = HandlerResult<T>> + Send + Sync + 'ctx,
380+
T: Serialize + Deserialize + 'static,
384381
{
385-
self.inner_context().run(name, run_closure)
382+
self.inner_context().run(run_closure)
386383
}
387384

388385
/// Return a random seed inherently predictable, based on the invocation id, which is not secret.

src/context/run.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::errors::HandlerResult;
22
use crate::serde::{Deserialize, Serialize};
33
use std::future::Future;
4+
use std::time::Duration;
45

56
/// Run closure trait
67
pub trait RunClosure {
@@ -23,3 +24,94 @@ where
2324
self()
2425
}
2526
}
27+
28+
/// Future created using [`super::ContextSideEffects::run`].
29+
pub trait RunFuture<O>: Future<Output = O> {
30+
/// Provide a custom retry policy for this `run` operation.
31+
///
32+
/// If unspecified, the `run` will be retried using the [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server),
33+
/// which by default retries indefinitely.
34+
fn with_retry_policy(self, retry_policy: RunRetryPolicy) -> Self;
35+
36+
/// Define a name for this `run` operation.
37+
///
38+
/// This is used mainly for observability.
39+
fn named(self, name: impl Into<String>) -> Self;
40+
}
41+
42+
/// This struct represents the policy to execute retries for run closures.
43+
#[derive(Debug, Clone)]
44+
pub struct RunRetryPolicy {
45+
pub(crate) initial_interval: Duration,
46+
pub(crate) factor: f32,
47+
pub(crate) max_interval: Option<Duration>,
48+
pub(crate) max_attempts: Option<u32>,
49+
pub(crate) max_duration: Option<Duration>,
50+
}
51+
52+
impl Default for RunRetryPolicy {
53+
fn default() -> Self {
54+
Self {
55+
initial_interval: Duration::from_millis(100),
56+
factor: 2.0,
57+
max_interval: Some(Duration::from_secs(2)),
58+
max_attempts: None,
59+
max_duration: Some(Duration::from_secs(50)),
60+
}
61+
}
62+
}
63+
64+
impl RunRetryPolicy {
65+
/// Create a new retry policy.
66+
pub fn new() -> Self {
67+
Self {
68+
initial_interval: Duration::from_millis(100),
69+
factor: 1.0,
70+
max_interval: None,
71+
max_attempts: None,
72+
max_duration: None,
73+
}
74+
}
75+
76+
/// Initial interval for the first retry attempt.
77+
pub fn with_initial_interval(mut self, initial_interval: Duration) -> Self {
78+
self.initial_interval = initial_interval;
79+
self
80+
}
81+
82+
/// Maximum interval between retries.
83+
pub fn with_factor(mut self, factor: f32) -> Self {
84+
self.factor = factor;
85+
self
86+
}
87+
88+
/// Maximum interval between retries.
89+
pub fn with_max_interval(mut self, max_interval: Duration) -> Self {
90+
self.max_interval = Some(max_interval);
91+
self
92+
}
93+
94+
/// Gives up retrying when either at least the given number of attempts is reached,
95+
/// or `max_duration` (if set) is reached first.
96+
///
97+
/// **Note:** The number of actual retries may be higher than the provided value.
98+
/// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
99+
///
100+
/// Infinite retries if this field and `max_duration` are unset.
101+
pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
102+
self.max_attempts = Some(max_attempts);
103+
self
104+
}
105+
106+
/// Gives up retrying when either the retry loop lasted at least for this given max duration,
107+
/// or `max_attempts` (if set) is reached first.
108+
///
109+
/// **Note:** The real retry loop duration may be higher than the given duration.
110+
/// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
111+
///
112+
/// Infinite retries if this field and `max_attempts` are unset.
113+
pub fn with_max_duration(mut self, max_duration: Duration) -> Self {
114+
self.max_duration = Some(max_duration);
115+
self
116+
}
117+
}

0 commit comments

Comments
 (0)