Skip to content

Commit da5ea71

Browse files
First pass at the implementation of side effect retry
1 parent 82c0ba3 commit da5ea71

File tree

7 files changed

+231
-115
lines changed

7 files changed

+231
-115
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pin-project-lite = "0.2"
2222
rand = { version = "0.8.5", optional = true }
2323
regress = "0.10"
2424
restate-sdk-macros = { version = "0.2.1", path = "macros" }
25-
restate-sdk-shared-core = { version = "0.0.5" }
25+
restate-sdk-shared-core = { path = "../sdk-shared-core" }
2626
serde = "1.0"
2727
serde_json = "1.0"
2828
thiserror = "1.0.63"

src/context/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ mod request;
1010
mod run;
1111

1212
pub use request::{Request, RequestTarget};
13-
pub use run::RunClosure;
13+
pub use run::{RunClosure, RunRetryPolicy};
1414
pub type HeaderMap = http::HeaderMap<String>;
1515

1616
/// Service handler context.
@@ -385,6 +385,23 @@ pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
385385
self.inner_context().run(name, run_closure)
386386
}
387387

388+
/// Run a non-deterministic operation and record its result.
389+
///
390+
fn run_with_retry<R, F, T>(
391+
&self,
392+
name: &'ctx str,
393+
retry_policy: RunRetryPolicy,
394+
run_closure: R,
395+
) -> impl Future<Output = Result<T, TerminalError>> + 'ctx
396+
where
397+
R: RunClosure<Fut = F, Output = T> + Send + Sync + 'ctx,
398+
T: Serialize + Deserialize,
399+
F: Future<Output = HandlerResult<T>> + Send + Sync + 'ctx,
400+
{
401+
self.inner_context()
402+
.run_with_retry(name, retry_policy, run_closure)
403+
}
404+
388405
/// Return a random seed inherently predictable, based on the invocation id, which is not secret.
389406
///
390407
/// This value is stable during the invocation lifecycle, thus across retries.

src/context/run.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::errors::HandlerResult;
22
use crate::serde::{Deserialize, Serialize};
33
use std::future::Future;
4+
use std::time::Duration;
45

56
/// Run closure trait
67
pub trait RunClosure {
@@ -23,3 +24,72 @@ where
2324
self()
2425
}
2526
}
27+
28+
/// This struct represents the policy to execute retries for run closures.
29+
#[derive(Debug, Clone)]
30+
pub struct RunRetryPolicy {
31+
pub(crate) initial_interval: Duration,
32+
pub(crate) factor: f32,
33+
pub(crate) max_interval: Option<Duration>,
34+
pub(crate) max_attempts: Option<u32>,
35+
pub(crate) max_duration: Option<Duration>,
36+
}
37+
38+
impl Default for RunRetryPolicy {
39+
fn default() -> Self {
40+
Self {
41+
initial_interval: Duration::from_millis(100),
42+
factor: 2.0,
43+
max_interval: Some(Duration::from_secs(2)),
44+
max_attempts: None,
45+
max_duration: Some(Duration::from_secs(50)),
46+
}
47+
}
48+
}
49+
50+
impl RunRetryPolicy {
51+
/// Create a new retry policy.
52+
pub fn new() -> Self {
53+
Self {
54+
initial_interval: Duration::from_millis(100),
55+
factor: 1.0,
56+
max_interval: None,
57+
max_attempts: None,
58+
max_duration: None,
59+
}
60+
}
61+
62+
/// Initial interval for the first retry attempt.
63+
pub fn with_initial_interval(mut self, initial_interval: Duration) -> Self {
64+
self.initial_interval = initial_interval;
65+
self
66+
}
67+
68+
/// Maximum interval between retries.
69+
pub fn with_factor(mut self, factor: f32) -> Self {
70+
self.factor = factor;
71+
self
72+
}
73+
74+
/// Maximum interval between retries.
75+
pub fn with_max_interval(mut self, max_interval: Duration) -> Self {
76+
self.max_interval = Some(max_interval);
77+
self
78+
}
79+
80+
/// Gives up retrying when either this number of attempts is reached,
81+
/// or `max_duration` (if set) is reached first.
82+
/// Infinite retries if this field and `max_duration` are unset.
83+
pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
84+
self.max_attempts = Some(max_attempts);
85+
self
86+
}
87+
88+
/// Gives up retrying when either the retry loop lasted for this given max duration,
89+
/// or `max_attempts` (if set) is reached first.
90+
/// Infinite retries if this field and `max_attempts` are unset.
91+
pub fn with_max_duration(mut self, max_duration: Duration) -> Self {
92+
self.max_duration = Some(max_duration);
93+
self
94+
}
95+
}

0 commit comments

Comments
 (0)