Skip to content

Side effect retry #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})"
strategy:
matrix:
sdk-test-suite: [ "1.8" ]
sdk-test-suite: [ "2.0" ]
permissions:
contents: read
issues: read
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pin-project-lite = "0.2"
rand = { version = "0.8.5", optional = true }
regress = "0.10"
restate-sdk-macros = { version = "0.2.1", path = "macros" }
restate-sdk-shared-core = { version = "0.0.5" }
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", branch = "main" }
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0.63"
Expand Down
2 changes: 1 addition & 1 deletion examples/failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct MyError;
impl FailureExample for FailureExampleImpl {
async fn do_run(&self, context: Context<'_>) -> HandlerResult<()> {
context
.run("get_ip", || async move {
.run(|| async move {
if rand::thread_rng().next_u32() % 4 == 0 {
return Err(TerminalError::new("Failed!!!").into());
}
Expand Down
3 changes: 2 additions & 1 deletion examples/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct RunExampleImpl(reqwest::Client);
impl RunExample for RunExampleImpl {
async fn do_run(&self, context: Context<'_>) -> HandlerResult<Json<HashMap<String, String>>> {
let res = context
.run("get_ip", || async move {
.run(|| async move {
let req = self.0.get("https://httpbin.org/ip").build()?;

let res = self
Expand All @@ -23,6 +23,7 @@ impl RunExample for RunExampleImpl {

Ok(Json::from(res))
})
.named("get_ip")
.await?
.into_inner();

Expand Down
15 changes: 6 additions & 9 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ mod request;
mod run;

pub use request::{Request, RequestTarget};
pub use run::RunClosure;
pub use run::{RunClosure, RunFuture, RunRetryPolicy};

pub type HeaderMap = http::HeaderMap<String>;

/// Service handler context.
Expand Down Expand Up @@ -371,18 +372,14 @@ impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
/// Trait exposing Restate functionalities to deal with non-deterministic operations.
pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
/// Run a non-deterministic operation and record its result.
///
fn run<R, F, T>(
&self,
name: &'ctx str,
run_closure: R,
) -> impl Future<Output = Result<T, TerminalError>> + 'ctx
#[must_use]
fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
where
R: RunClosure<Fut = F, Output = T> + Send + Sync + 'ctx,
T: Serialize + Deserialize,
F: Future<Output = HandlerResult<T>> + Send + Sync + 'ctx,
T: Serialize + Deserialize + 'static,
{
self.inner_context().run(name, run_closure)
self.inner_context().run(run_closure)
}

/// Return a random seed inherently predictable, based on the invocation id, which is not secret.
Expand Down
92 changes: 92 additions & 0 deletions src/context/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::errors::HandlerResult;
use crate::serde::{Deserialize, Serialize};
use std::future::Future;
use std::time::Duration;

/// Run closure trait
pub trait RunClosure {
Expand All @@ -23,3 +24,94 @@ where
self()
}
}

/// Future created using [`super::ContextSideEffects::run`].
pub trait RunFuture<O>: Future<Output = O> {
/// Provide a custom retry policy for this `run` operation.
///
/// If unspecified, the `run` will be retried using the [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server),
/// which by default retries indefinitely.
fn with_retry_policy(self, retry_policy: RunRetryPolicy) -> Self;

/// Define a name for this `run` operation.
///
/// This is used mainly for observability.
fn named(self, name: impl Into<String>) -> Self;
}

/// This struct represents the policy to execute retries for run closures.
#[derive(Debug, Clone)]
pub struct RunRetryPolicy {
pub(crate) initial_interval: Duration,
pub(crate) factor: f32,
pub(crate) max_interval: Option<Duration>,
pub(crate) max_attempts: Option<u32>,
pub(crate) max_duration: Option<Duration>,
}

impl Default for RunRetryPolicy {
fn default() -> Self {
Self {
initial_interval: Duration::from_millis(100),
factor: 2.0,
max_interval: Some(Duration::from_secs(2)),
max_attempts: None,
max_duration: Some(Duration::from_secs(50)),
}
}
}

impl RunRetryPolicy {
/// Create a new retry policy.
pub fn new() -> Self {
Self {
initial_interval: Duration::from_millis(100),
factor: 1.0,
max_interval: None,
max_attempts: None,
max_duration: None,
}
}

/// Initial interval for the first retry attempt.
pub fn with_initial_interval(mut self, initial_interval: Duration) -> Self {
self.initial_interval = initial_interval;
self
}

/// Maximum interval between retries.
pub fn with_factor(mut self, factor: f32) -> Self {
self.factor = factor;
self
}

/// Maximum interval between retries.
pub fn with_max_interval(mut self, max_interval: Duration) -> Self {
self.max_interval = Some(max_interval);
self
}

/// Gives up retrying when either at least the given number of attempts is reached,
/// or `max_duration` (if set) is reached first.
///
/// **Note:** The number of actual retries may be higher than the provided value.
/// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
///
/// Infinite retries if this field and `max_duration` are unset.
pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = Some(max_attempts);
self
}

/// Gives up retrying when either the retry loop lasted at least for this given max duration,
/// or `max_attempts` (if set) is reached first.
///
/// **Note:** The real retry loop duration may be higher than the given duration.
/// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
///
/// Infinite retries if this field and `max_attempts` are unset.
pub fn with_max_duration(mut self, max_duration: Duration) -> Self {
self.max_duration = Some(max_duration);
self
}
}
Loading