diff --git a/examples/services/mod.rs b/examples/services/mod.rs new file mode 100644 index 0000000..fc734f9 --- /dev/null +++ b/examples/services/mod.rs @@ -0,0 +1,3 @@ +pub mod my_service; +pub mod my_virtual_object; +pub mod my_workflow; \ No newline at end of file diff --git a/examples/services/my_service.rs b/examples/services/my_service.rs new file mode 100644 index 0000000..87f2518 --- /dev/null +++ b/examples/services/my_service.rs @@ -0,0 +1,22 @@ +use restate_sdk::prelude::*; + +#[restate_sdk::service] +pub trait MyService { + async fn my_handler(greeting: String) -> Result; +} + +pub struct MyServiceImpl; + +impl MyService for MyServiceImpl { + async fn my_handler(&self, _ctx: Context<'_>, greeting: String) -> Result { + Ok(format!("{greeting}!")) + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + HttpServer::new(Endpoint::builder().bind(MyServiceImpl.serve()).build()) + .listen_and_serve("0.0.0.0:9080".parse().unwrap()) + .await; +} diff --git a/examples/services/my_virtual_object.rs b/examples/services/my_virtual_object.rs new file mode 100644 index 0000000..42813e8 --- /dev/null +++ b/examples/services/my_virtual_object.rs @@ -0,0 +1,39 @@ +use restate_sdk::prelude::*; + +#[restate_sdk::object] +pub trait MyVirtualObject { + async fn my_handler(name: String) -> Result; + #[shared] + async fn my_concurrent_handler(name: String) -> Result; +} + +pub struct MyVirtualObjectImpl; + +impl MyVirtualObject for MyVirtualObjectImpl { + async fn my_handler( + &self, + ctx: ObjectContext<'_>, + greeting: String, + ) -> Result { + Ok(format!("Greetings {} {}", greeting, ctx.key())) + } + async fn my_concurrent_handler( + &self, + ctx: SharedObjectContext<'_>, + greeting: String, + ) -> Result { + Ok(format!("Greetings {} {}", greeting, ctx.key())) + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + HttpServer::new( + Endpoint::builder() + .bind(MyVirtualObjectImpl.serve()) + .build(), + ) + .listen_and_serve("0.0.0.0:9080".parse().unwrap()) + .await; +} diff --git a/examples/services/my_workflow.rs b/examples/services/my_workflow.rs new file mode 100644 index 0000000..c1af22d --- /dev/null +++ b/examples/services/my_workflow.rs @@ -0,0 +1,35 @@ +use restate_sdk::prelude::*; + +#[restate_sdk::workflow] +pub trait MyWorkflow { + async fn run(req: String) -> Result; + #[shared] + async fn interact_with_workflow() -> Result<(), HandlerError>; +} + +pub struct MyWorkflowImpl; + +impl MyWorkflow for MyWorkflowImpl { + async fn run(&self, _ctx: WorkflowContext<'_>, _req: String) -> Result { + // implement workflow logic here + + Ok(String::from("success")) + } + async fn interact_with_workflow( + &self, + _ctx: SharedWorkflowContext<'_>, + ) -> Result<(), HandlerError> { + // implement interaction logic here + // e.g. resolve a promise that the workflow is waiting on + + Ok(()) + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + HttpServer::new(Endpoint::builder().bind(MyWorkflowImpl.serve()).build()) + .listen_and_serve("0.0.0.0:9080".parse().unwrap()) + .await; +} diff --git a/src/context/mod.rs b/src/context/mod.rs index 7c99a63..bf9da1e 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -8,7 +8,6 @@ use std::time::Duration; mod request; mod run; - pub use request::{Request, RequestTarget}; pub use run::{RunClosure, RunFuture, RunRetryPolicy}; @@ -207,7 +206,47 @@ impl<'ctx> WorkflowContext<'ctx> { } } -/// Trait exposing Restate timers functionalities. +/// +/// # Scheduling & Timers +/// The Restate SDK includes durable timers. +/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time. +/// These timers are resilient to failures and restarts. +/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts. +/// +/// ## Scheduling Async Tasks +/// +/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls][crate::context::ContextClient#delayed-calls]. +/// +/// +/// ## Durable sleep +/// To sleep in a Restate application for ten seconds, do the following: +/// +/// ```rust,no_run +/// # use restate_sdk::prelude::*; +/// # use std::convert::Infallible; +/// # use std::time::Duration; +/// # +/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> { +/// ctx.sleep(Duration::from_secs(10)).await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// **Cost savings on FaaS**: +/// Restate suspends the handler while it is sleeping, to free up resources. +/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping. +/// +/// **Sleeping in Virtual Objects**: +/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping. +/// +///
+/// Clock synchronization Restate Server vs. SDK +/// +/// The Restate SDK calculates the wake-up time based on the delay you specify. +/// The Restate Server then uses this calculated time to wake up the handler. +/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate. +/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized. +///
pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> { /// Sleep using Restate fn sleep(&self, duration: Duration) -> impl Future> + 'ctx { @@ -217,7 +256,173 @@ pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> { impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {} -/// Trait exposing Restate service to service communication functionalities. +/// # Service Communication +/// +/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response. +/// +/// ## Request-response calls +/// +/// Request-response calls are requests where the client waits for the response. +/// +/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way: +/// +/// ```rust,no_run +/// # #[path = "../../examples/services/mod.rs"] +/// # mod services; +/// # use services::my_virtual_object::MyVirtualObjectClient; +/// # use services::my_service::MyServiceClient; +/// # use services::my_workflow::MyWorkflowClient; +/// # use restate_sdk::prelude::*; +/// # +/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> { +/// // To a Service: +/// let service_response = ctx +/// .service_client::() +/// .my_handler(String::from("Hi!")) +/// .call() +/// .await?; +/// +/// // To a Virtual Object: +/// let object_response = ctx +/// .object_client::("Mary") +/// .my_handler(String::from("Hi!")) +/// .call() +/// .await?; +/// +/// // To a Workflow: +/// let workflow_result = ctx +/// .workflow_client::("my-workflow-id") +/// .run(String::from("Hi!")) +/// .call() +/// .await?; +/// ctx.workflow_client::("my-workflow-id") +/// .interact_with_workflow() +/// .call() +/// .await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// 1. **Create a service client**. +/// - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`. +/// - For Workflows, you need to supply a workflow ID that is unique per workflow execution. +/// 2. **Specify the handler** you want to call and supply the request. +/// 3. **Await** the call to retrieve the response. +/// +/// **No need for manual retry logic**: +/// Restate proxies all the calls and logs them in the journal. +/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here. +/// +/// ## Sending messages +/// +/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows: +/// +/// ```rust,no_run +/// # #[path = "../../examples/services/mod.rs"] +/// # mod services; +/// # use services::my_virtual_object::MyVirtualObjectClient; +/// # use services::my_service::MyServiceClient; +/// # use services::my_workflow::MyWorkflowClient; +/// # use restate_sdk::prelude::*; +/// # +/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> { +/// // To a Service: +/// ctx.service_client::() +/// .my_handler(String::from("Hi!")) +/// .send(); +/// +/// // To a Virtual Object: +/// ctx.object_client::("Mary") +/// .my_handler(String::from("Hi!")) +/// .send(); +/// +/// // To a Workflow: +/// ctx.workflow_client::("my-workflow-id") +/// .run(String::from("Hi!")) +/// .send(); +/// ctx.workflow_client::("my-workflow-id") +/// .interact_with_workflow() +/// .send(); +/// # Ok(()) +/// # } +/// ``` +/// +/// **No need for message queues**: +/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery. +/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed. +/// +/// ## Delayed calls +/// +/// A delayed call is a one-way call that gets executed after a specified delay. +/// +/// To schedule a delayed call, send a message with a delay parameter, as follows: +/// +/// ```rust,no_run +/// # #[path = "../../examples/services/mod.rs"] +/// # mod services; +/// # use services::my_virtual_object::MyVirtualObjectClient; +/// # use services::my_service::MyServiceClient; +/// # use services::my_workflow::MyWorkflowClient; +/// # use restate_sdk::prelude::*; +/// # use std::time::Duration; +/// # +/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> { +/// // To a Service: +/// ctx.service_client::() +/// .my_handler(String::from("Hi!")) +/// .send_with_delay(Duration::from_millis(5000)); +/// +/// // To a Virtual Object: +/// ctx.object_client::("Mary") +/// .my_handler(String::from("Hi!")) +/// .send_with_delay(Duration::from_millis(5000)); +/// +/// // To a Workflow: +/// ctx.workflow_client::("my-workflow-id") +/// .run(String::from("Hi!")) +/// .send_with_delay(Duration::from_millis(5000)); +/// ctx.workflow_client::("my-workflow-id") +/// .interact_with_workflow() +/// .send_with_delay(Duration::from_millis(5000)); +/// # Ok(()) +/// # } +/// ``` +/// +/// You can also use this functionality to schedule async tasks. +/// Restate will make sure the task gets executed at the desired time. +/// +/// ### Ordering guarantees in Virtual Objects +/// Invocations to a Virtual Object are executed serially. +/// Invocations will execute in the same order in which they arrive at Restate. +/// For example, assume a handler calls the same Virtual Object twice: +/// +/// ```rust,no_run +/// # #[path = "../../examples/services/my_virtual_object.rs"] +/// # mod my_virtual_object; +/// # use my_virtual_object::MyVirtualObjectClient; +/// # use restate_sdk::prelude::*; +/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> { +/// ctx.object_client::("Mary") +/// .my_handler(String::from("I'm call A!")) +/// .send(); +/// ctx.object_client::("Mary") +/// .my_handler(String::from("I'm call B!")) +/// .send(); +/// # Ok(()) +/// } +/// ``` +/// +/// It is guaranteed that call A will execute before call B. +/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls. +/// +/// ### Deadlocks with Virtual Objects +/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests. +/// Some example cases: +/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys. +/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again. +/// +/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations). +/// pub trait ContextClient<'ctx>: private::SealedContext<'ctx> { /// Create a [`Request`]. fn request( @@ -339,12 +544,83 @@ pub trait IntoWorkflowClient<'ctx>: Sized { impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {} -/// Trait exposing Restate awakeables functionalities. +/// # Awakeables +/// +/// Awakeables pause an invocation while waiting for another process to complete a task. +/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result. +/// This pattern is also known as the callback (task token) pattern. +/// +/// ## Creating awakeables +/// +/// 1. **Create an awakeable**. This contains a String identifier and a Promise. +/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...). +/// For example, send an HTTP request to a service that executes the task, and attach the ID to the payload. +/// You use `ctx.run` to avoid re-triggering the task on retries. +/// 3. **Wait** until the other process has executed the task. +/// The handler **receives the payload and resumes**. +/// +/// ```rust,no_run +/// # use restate_sdk::prelude::*; +/// # +/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> { +/// /// 1. Create an awakeable +/// let (id, promise) = ctx.awakeable::(); +/// +/// /// 2. Trigger a task +/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?; +/// +/// /// 3. Wait for the promise to be resolved +/// let payload = promise.await?; +/// # Ok(()) +/// # } +/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{ +/// # Ok(()) +/// # } +/// ``` +/// +/// +/// ## Completing awakeables +/// +/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it +/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler. +/// +/// - Resolving over HTTP with its ID and an optional payload: +/// +/// ```text +/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve +/// -H 'content-type: application/json' +/// -d '{"hello": "world"}' +/// ``` +/// +/// - Rejecting over HTTP with its ID and a reason: +/// +/// ```text +/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject +/// -H 'content-type: text/plain' \ +/// -d 'Very bad error!' +/// ``` /// -/// Awakeables can be used to implement external asynchronous systems interactions, -/// for example you can send a Kafka record including the awakeable id returned by [`ContextAwakeables::awakeable`], -/// and then let another service consume from Kafka the responses of given external system interaction by using -/// [`ContextAwakeables::resolve_awakeable`] or [`ContextAwakeables::reject_awakeable`]. +/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason: +/// +/// ```rust,no_run +/// # use restate_sdk::prelude::*; +/// # +/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> { +/// // Resolve the awakeable +/// ctx.resolve_awakeable(&id, "hello".to_string()); +/// +/// // Or reject the awakeable +/// ctx.reject_awakeable(&id, TerminalError::new("my error reason")); +/// # Ok(()) +/// # } +/// ``` +/// +/// For more info about serialization of the payload, see [crate::serde] +/// +/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed. +/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return. +/// +/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved. pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> { /// Create an awakeable fn awakeable( @@ -369,9 +645,42 @@ pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> { impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {} -/// Trait exposing Restate functionalities to deal with non-deterministic operations. +/// # Journaling Results +/// +/// Restate uses an execution log for replay after failures and suspensions. +/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log. +/// The SDK offers some functionalities to help you with this: +/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries. +/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries. +/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries. +/// pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> { - /// Run a non-deterministic operation and record its result. + /// ## Journaled actions + /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc). + /// Restate replays the result instead of re-executing the operation on retries. + /// + /// Here is an example of a database request for which the string response is stored in Restate: + /// ```rust,no_run + /// # use restate_sdk::prelude::*; + /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> { + /// let response = ctx.run(|| do_db_request()).await?; + /// # Ok(()) + /// # } + /// # async fn do_db_request() -> Result{ + /// # Ok("Hello".to_string()) + /// # } + /// ``` + /// + /// You cannot use the Restate context within `ctx.run`. + /// This includes actions such as getting state, calling another service, and nesting other journaled actions. + /// + /// For more info about serialization of the return values, see [crate::serde]. + /// + /// **Caution: Immediately await journaled actions:** + /// Always immediately await `ctx.run`, before doing any other context calls. + /// If not, you might bump into non-determinism errors during replay, + /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way. + /// #[must_use] fn run(&self, run_closure: R) -> impl RunFuture> + 'ctx where @@ -389,16 +698,41 @@ pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> { private::SealedContext::random_seed(self) } + /// ### Generating random numbers + /// /// Return a [`rand::Rng`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`]. /// + /// For example, you can use this to generate a random number: + /// + /// ```rust,no_run + /// # use restate_sdk::prelude::*; + /// # use rand::Rng; + /// async fn rand_generate(mut ctx: Context<'_>) { + /// let x: u32 = ctx.rand().gen(); + /// # } + /// ``` + /// /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options. /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`]. #[cfg(feature = "rand")] fn rand(&mut self) -> &mut rand::prelude::StdRng { private::SealedContext::rand(self) } - - /// Return a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`]. + /// ### Generating UUIDs + /// + /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`]. + /// + /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries. + /// + /// Do not use this in cryptographic contexts. + /// + /// ```rust,no_run + /// # use restate_sdk::prelude::*; + /// # use uuid::Uuid; + /// # async fn uuid_generate(mut ctx: Context<'_>) { + /// let uuid: Uuid = ctx.rand_uuid(); + /// # } + /// ``` #[cfg(all(feature = "rand", feature = "uuid"))] fn rand_uuid(&mut self) -> uuid::Uuid { let rand = private::SealedContext::rand(self); @@ -408,7 +742,48 @@ pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> { impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {} -/// Trait exposing Restate K/V read functionalities. +/// # Reading state +/// You can store key-value state in Restate. +/// Restate makes sure the state is consistent with the processing of the code execution. +/// +/// **This feature is only available for Virtual Objects and Workflows:** +/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object). +/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state. +/// +/// ```rust,no_run +/// # use restate_sdk::prelude::*; +/// # +/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> { +/// /// 1. Listing state keys +/// /// List all the state keys that have entries in the state store for this object, via: +/// let keys = ctx.get_keys().await?; +/// +/// /// 2. Getting a state value +/// let my_string = ctx +/// .get::("my-key") +/// .await? +/// .unwrap_or(String::from("my-default")); +/// let my_number = ctx.get::("my-number-key").await?.unwrap_or_default(); +/// +/// /// 3. Setting a state value +/// ctx.set("my-key", String::from("my-value")); +/// +/// /// 4. Clearing a state value +/// ctx.clear("my-key"); +/// +/// /// 5. Clearing all state values +/// /// Deletes all the state in Restate for the object +/// ctx.clear_all(); +/// # Ok(()) +/// # } +/// ``` +/// +/// For more info about serialization, see [crate::serde] +/// +/// ### Command-line introspection +/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI. +/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information. +/// pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> { /// Get state fn get( @@ -429,7 +804,48 @@ impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> Cont { } -/// Trait exposing Restate K/V write functionalities. +/// # Writing State +/// You can store key-value state in Restate. +/// Restate makes sure the state is consistent with the processing of the code execution. +/// +/// **This feature is only available for Virtual Objects and Workflows:** +/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object). +/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state. +/// +/// ```rust,no_run +/// # use restate_sdk::prelude::*; +/// # +/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> { +/// /// 1. Listing state keys +/// /// List all the state keys that have entries in the state store for this object, via: +/// let keys = ctx.get_keys().await?; +/// +/// /// 2. Getting a state value +/// let my_string = ctx +/// .get::("my-key") +/// .await? +/// .unwrap_or(String::from("my-default")); +/// let my_number = ctx.get::("my-number-key").await?.unwrap_or_default(); +/// +/// /// 3. Setting a state value +/// ctx.set("my-key", String::from("my-value")); +/// +/// /// 4. Clearing a state value +/// ctx.clear("my-key"); +/// +/// /// 5. Clearing all state values +/// /// Deletes all the state in Restate for the object +/// ctx.clear_all(); +/// # Ok(()) +/// # } +/// ``` +/// +/// For more info about serialization, see [crate::serde] +/// +/// ## Command-line introspection +/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI. +/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information. +/// pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> { /// Set state fn set(&self, key: &str, t: T) { diff --git a/src/errors.rs b/src/errors.rs index dc7f2ff..b56064e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,5 +1,21 @@ -//! Error types to use within handlers. - +//! # Error Handling +//! +//! Restate handles retries for failed invocations. +//! By default, Restate does infinite retries with an exponential backoff strategy. +//! +//! For failures for which you do not want retries, but instead want the invocation to end and the error message +//! to be propagated back to the caller, you can return a [`TerminalError`]. +//! +//! You can return a [`TerminalError`] with an optional HTTP status code and a message anywhere in your handler, as follows: +//! +//! ```rust,no_run +//! # use restate_sdk::prelude::*; +//! # async fn handle() -> Result<(), HandlerError> { +//! Err(TerminalError::new("This is a terminal error").into()) +//! # } +//! ``` +//! +//! You can catch terminal exceptions. For example, you can catch the terminal exception that comes out of a [call to another service][crate::context::ContextClient], and build your control flow around it. use restate_sdk_shared_core::Failure; use std::error::Error as StdError; use std::fmt; diff --git a/src/http_server.rs b/src/http_server.rs index b93f076..fb78a7a 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,4 +1,61 @@ -//! Battery-packed HTTP server to expose services. +//! # Serving +//! Restate services run as an HTTP endpoint. +//! +//! ## Creating an HTTP endpoint +//! 1. Create the endpoint +//! 2. Bind one or multiple services to it. +//! 3. Listen on the specified port (default `9080`) for connections and requests. +//! +//! ```rust,no_run +//! # #[path = "../examples/services/mod.rs"] +//! # mod services; +//! # use services::my_service::{MyService, MyServiceImpl}; +//! # use services::my_virtual_object::{MyVirtualObject, MyVirtualObjectImpl}; +//! # use services::my_workflow::{MyWorkflow, MyWorkflowImpl}; +//! use restate_sdk::endpoint::Endpoint; +//! use restate_sdk::http_server::HttpServer; +//! +//! #[tokio::main] +//! async fn main() { +//! HttpServer::new( +//! Endpoint::builder() +//! .bind(MyServiceImpl.serve()) +//! .bind(MyVirtualObjectImpl.serve()) +//! .bind(MyWorkflowImpl.serve()) +//! .build(), +//! ) +//! .listen_and_serve("0.0.0.0:9080".parse().unwrap()) +//! .await; +//! } +//! ``` +//! +//! +//! ## Validating request identity +//! +//! SDKs can validate that incoming requests come from a particular Restate +//! instance. You can find out more about request identity in the [Security docs](https://docs.restate.dev/operate/security#locking-down-service-access). +//! Add the identity key to your endpoint as follows: +//! +//! ```rust,no_run +//! # #[path = "../examples/services/mod.rs"] +//! # mod services; +//! # use services::my_service::{MyService, MyServiceImpl}; +//! # use restate_sdk::endpoint::Endpoint; +//! # use restate_sdk::http_server::HttpServer; +//! # +//! # #[tokio::main] +//! # async fn main() { +//! HttpServer::new( +//! Endpoint::builder() +//! .bind(MyServiceImpl.serve()) +//! .identity_key("publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f") +//! .unwrap() +//! .build(), +//! ) +//! .listen_and_serve("0.0.0.0:9080".parse().unwrap()) +//! .await; +//! # } +//! ``` use crate::endpoint::Endpoint; use crate::hyper::HyperEndpoint; diff --git a/src/lib.rs b/src/lib.rs index 91b6bd0..17dc0de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,32 +1,126 @@ //! # Restate Rust SDK //! -//! [Restate](https://restate.dev/) is a system for easily building resilient applications using _distributed durable async/await_. +//! [Restate](https://restate.dev/) is a system for easily building resilient applications. //! This crate is the Restate SDK for writing Restate services using Rust. //! +//! ## New to Restate? +//! +//! If you are new to Restate, we recommend the following resources: +//! +//! - [Learn about the concepts of Restate](https://docs.restate.dev/concepts/durable_building_blocks) +//! - Use cases: +//! - [Workflows](https://docs.restate.dev/use-cases/workflows) +//! - [Microservice orchestration](https://docs.restate.dev/use-cases/microservice-orchestration) +//! - [Event processing](https://docs.restate.dev/use-cases/event-processing) +//! - [Async tasks](https://docs.restate.dev/use-cases/async-tasks) +//! - [Quickstart](https://docs.restate.dev/get_started/quickstart?sdk=rust) +//! - [Do the Tour of Restate to try out the APIs](https://docs.restate.dev/get_started/tour/?sdk=rust) +//! +//! # Features +//! +//! Have a look at the following SDK capabilities: +//! +//! - [Service Communication][crate::context::ContextClient]: Durable RPC and messaging between services (optionally with a delay). +//! - [Journaling Results][crate::context::ContextSideEffects]: Persist results in Restate's log to avoid re-execution on retries +//! - State: [read][crate::context::ContextReadState] and [write](crate::context::ContextWriteState): Store and retrieve state in Restate's key-value store +//! - [Scheduling & Timers][crate::context::ContextTimers]: Let a handler pause for a certain amount of time. Restate durably tracks the timer across failures. +//! - [Awakeables][crate::context::ContextAwakeables]: Durable Futures to wait for events and the completion of external tasks. +//! - [Error Handling][crate::errors]: Restate retries failures infinitely. Use `TerminalError` to stop retries. +//! - [Serialization][crate::serde]: The SDK serializes results to send them to the Server. +//! - [Serving][crate::http_server]: Start an HTTP server to expose services. +//! +//! # SDK Overview +//! +//! The Restate Rust SDK lets you implement durable handlers. Handlers can be part of three types of services: +//! +//! - [Services](https://docs.restate.dev/concepts/services/#services-1): a collection of durable handlers +//! - [Virtual Objects](https://docs.restate.dev/concepts/services/#virtual-objects): an object consists of a collection of durable handlers and isolated K/V state. Virtual Objects are useful for modeling stateful entities, where at most one handler can run at a time per object. +//! - [Workflows](https://docs.restate.dev/concepts/services/#workflows): Workflows have a `run` handler that executes exactly once per workflow instance, and executes a set of steps durably. Workflows can have other handlers that can be called multiple times and interact with the workflow. +//! +//! ## Services +//! +//! [Services](https://docs.restate.dev/concepts/services/#services-1) and their handlers are defined as follows: +//! //! ```rust,no_run //! // The prelude contains all the imports you need to get started //! use restate_sdk::prelude::*; //! //! // Define the service using Rust traits //! #[restate_sdk::service] -//! trait Greeter { -//! async fn greet(name: String) -> HandlerResult; +//! trait MyService { +//! async fn my_handler(greeting: String) -> Result; //! } //! //! // Implement the service -//! struct GreeterImpl; -//! impl Greeter for GreeterImpl { -//! async fn greet(&self, _: Context<'_>, name: String) -> HandlerResult { -//! Ok(format!("Greetings {name}")) +//! struct MyServiceImpl; +//! impl MyService for MyServiceImpl { +//! +//! async fn my_handler(&self, ctx: Context<'_>, greeting: String) -> Result { +//! Ok(format!("{greeting}!")) //! } +//! //! } //! //! // Start the HTTP server to expose services //! #[tokio::main] //! async fn main() { +//! HttpServer::new(Endpoint::builder().bind(MyServiceImpl.serve()).build()) +//! .listen_and_serve("0.0.0.0:9080".parse().unwrap()) +//! .await; +//! } +//! ``` +//! +//! - Specify that you want to create a service by using the [`#[restate_sdk::service]` macro](restate_sdk_macros::service). +//! - Create a trait with the service handlers. +//! - Handlers can accept zero or one parameter and return a [`Result`]. +//! - The type of the input parameter of the handler needs to implement [`Serialize`](crate::serde::Deserialize) and [`Deserialize`](crate::serde::Deserialize). See [`crate::serde`]. +//! - The Result contains the return value or a [`HandlerError`][crate::errors::HandlerError], which can be a [`TerminalError`](crate::errors::TerminalError) or any other Rust's [`std::error::Error`]. +//! - The service handler can now be called at `/MyService/myHandler`. You can optionally override the handler name used via `#[name = "myHandler"]`. More details on handler invocations can be found in the [docs](https://docs.restate.dev/invoke/http). +//! - Implement the trait on a concrete type, for example on a struct. +//! - The first parameter of a handler after `&self` is always a [`Context`](crate::context::Context) to interact with Restate. +//! The SDK stores the actions you do on the context in the Restate journal to make them durable. +//! - Finally, create an HTTP endpoint and bind the service(s) to it. Listen on the specified port (here 9080) for connections and requests. +//! +//! ## Virtual Objects +//! [Virtual Objects](https://docs.restate.dev/concepts/services/#virtual-objects) and their handlers are defined similarly to services, with the following differences: +//! +//! ```rust,no_run +//!use restate_sdk::prelude::*; +//! +//! #[restate_sdk::object] +//! pub trait MyVirtualObject { +//! async fn my_handler(name: String) -> Result; +//! #[shared] +//! async fn my_concurrent_handler(name: String) -> Result; +//! } +//! +//! pub struct MyVirtualObjectImpl; +//! +//! impl MyVirtualObject for MyVirtualObjectImpl { +//! +//! async fn my_handler( +//! &self, +//! ctx: ObjectContext<'_>, +//! greeting: String, +//! ) -> Result { +//! Ok(format!("{} {}", greeting, ctx.key())) +//! } +//! +//! async fn my_concurrent_handler( +//! &self, +//! ctx: SharedObjectContext<'_>, +//! greeting: String, +//! ) -> Result { +//! Ok(format!("{} {}", greeting, ctx.key())) +//! } +//! +//! } +//! +//! #[tokio::main] +//! async fn main() { //! HttpServer::new( //! Endpoint::builder() -//! .bind(GreeterImpl.serve()) +//! .bind(MyVirtualObjectImpl.serve()) //! .build(), //! ) //! .listen_and_serve("0.0.0.0:9080".parse().unwrap()) @@ -34,7 +128,92 @@ //! } //! ``` //! +//! - Specify that you want to create a Virtual Object by using the [`#[restate_sdk::object]` macro](restate_sdk_macros::object). +//! - The first argument of each handler must be the [`ObjectContext`](crate::context::ObjectContext) parameter. Handlers with the `ObjectContext` parameter can write to the K/V state store. Only one handler can be active at a time per object, to ensure consistency. +//! - You can retrieve the key of the object you are in via [`ObjectContext.key`]. +//! - If you want to have a handler that executes concurrently to the others and doesn't have write access to the K/V state, add `#[shared]` to the handler definition in the trait. +//! Shared handlers need to use the [`SharedObjectContext`](crate::context::SharedObjectContext). +//! You can use these handlers, for example, to read K/V state and expose it to the outside world, or to interact with the blocking handler and resolve awakeables etc. +//! +//! ## Workflows +//! +//! [Workflows](https://docs.restate.dev/concepts/services/#workflows) are a special type of Virtual Objects, their definition is similar but with the following differences: +//! +//! ```rust,no_run +//! use restate_sdk::prelude::*; +//! +//! #[restate_sdk::workflow] +//! pub trait MyWorkflow { +//! async fn run(req: String) -> Result; +//! #[shared] +//! async fn interact_with_workflow() -> Result<(), HandlerError>; +//! } +//! +//! pub struct MyWorkflowImpl; +//! +//! impl MyWorkflow for MyWorkflowImpl { +//! +//! async fn run(&self, ctx: WorkflowContext<'_>, req: String) -> Result { +//! //! implement workflow logic here +//! +//! Ok(String::from("success")) +//! } +//! +//! async fn interact_with_workflow(&self, ctx: SharedWorkflowContext<'_>) -> Result<(), HandlerError> { +//! //! implement interaction logic here +//! //! e.g. resolve a promise that the workflow is waiting on +//! +//! Ok(()) +//! } +//! +//! } +//! +//! #[tokio::main] +//! async fn main() { +//! HttpServer::new(Endpoint::builder().bind(MyWorkflowImpl.serve()).build()) +//! .listen_and_serve("0.0.0.0:9080".parse().unwrap()) +//! .await; +//! } +//! ``` +//! +//! - Specify that you want to create a Workflow by using the [`#[restate_sdk::workflow]` macro](workflow). +//! - The workflow needs to have a `run` handler. +//! - The first argument of the `run` handler must be the [`WorkflowContext`](crate::context::WorkflowContext) parameter. +//! The `WorkflowContext` parameter is used to interact with Restate. +//! The `run` handler executes exactly once per workflow instance. +//! - The other handlers of the workflow are used to interact with the workflow: either query it, or signal it. +//! They use the [`SharedWorkflowContext`](crate::context::SharedWorkflowContext) to interact with the SDK. +//! These handlers can run concurrently with the run handler and can still be called after the run handler has finished. +//! - Have a look at the [workflow docs](workflow) to learn more. +//! +//! +//! Learn more about each service type here: +//! - [Service](restate_sdk_macros::service) +//! - [Virtual Object](object) +//! - [Workflow](workflow) +//! +//! +//! ### Logging +//! +//! This crate uses the [tracing crate][tracing] to emit logs, so you'll need to configure a tracing subscriber to get logs. For example, to configure console logging using `tracing_subscriber::fmt`: +//! ```rust,no_run +//! #[tokio::main] +//! async fn main() { +//! //! To enable logging +//! tracing_subscriber::fmt::init(); +//! +//! // Start http server etc... +//! } +//! ``` +//! +//! For more information, have a look at the [tracing subscriber doc](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/index.html#filtering-events-with-environment-variables). +//! +//! # References +//! //! For a general overview about Restate, check out the [Restate documentation](https://docs.restate.dev). +//! +//! You can find more Rust examples in the [examples repo](https://github.com/restatedev/examples) +//! pub mod endpoint; pub mod service; @@ -168,24 +347,152 @@ pub use restate_sdk_macros::service; /// ``` pub use restate_sdk_macros::object; +/// +/// # Workflows +/// /// Entry-point macro to define a Restate [Workflow](https://docs.restate.dev/concepts/services#workflows). /// -/// For more details, check the [`service` macro](macro@crate::service) documentation. +/// Workflows are a sequence of steps that gets executed durably. +/// A workflow can be seen as a special type of [Virtual Object](https://docs.restate.dev/concepts/services#virtual-objects) with some special characteristics: /// -/// ## Shared handlers +/// - Each workflow definition has a `run` handler that implements the workflow logic. +/// - The `run` handler executes exactly one time for each workflow instance (object / key). +/// - A workflow definition can implement other handlers that can be called multiple times, and can interact with the workflow. +/// - Workflows have access to the `WorkflowContext` and `SharedWorkflowContext`, giving them some extra functionality, for example Durable Promises to signal workflows. /// -/// To define a shared handler, simply annotate the handler with the `#[shared]` annotation: +/// **Note: Workflow retention time**: +/// The retention time of a workflow execution is 24 hours after the finishing of the `run` handler. +/// After this timeout any [K/V state][crate::context::ContextReadState] is cleared, the workflow's shared handlers cannot be called anymore, and the Durable Promises are discarded. +/// The retention time can be configured via the [Admin API](https://docs.restate.dev//references/admin-api/#tag/service/operation/modify_service) per Workflow definition by setting `workflow_completion_retention`. +/// +/// ## Implementing workflows +/// Have a look at the code example to get a better understanding of how workflows are implemented: /// /// ```rust,no_run /// use restate_sdk::prelude::*; /// /// #[restate_sdk::workflow] -/// trait Billing { -/// async fn run() -> Result; +/// pub trait SignupWorkflow { +/// async fn run(req: String) -> Result; /// #[shared] -/// async fn get_status() -> Result; +/// async fn click(click_secret: String) -> Result<(), HandlerError>; +/// #[shared] +/// async fn get_status() -> Result; /// } +/// +/// pub struct SignupWorkflowImpl; +/// +/// impl SignupWorkflow for SignupWorkflowImpl { +/// +/// async fn run(&self, mut ctx: WorkflowContext<'_>, email: String) -> Result { +/// let secret = ctx.rand_uuid().to_string(); +/// ctx.run(|| send_email_with_link(email.clone(), secret.clone())).await?; +/// ctx.set("status", "Email sent".to_string()); +/// +/// let click_secret = ctx.promise::("email.clicked").await?; +/// ctx.set("status", "Email clicked".to_string()); +/// +/// Ok(click_secret == secret) +/// } +/// +/// async fn click(&self, ctx: SharedWorkflowContext<'_>, click_secret: String) -> Result<(), HandlerError> { +/// ctx.resolve_promise::("email.clicked", click_secret); +/// Ok(()) +/// } +/// +/// async fn get_status(&self, ctx: SharedWorkflowContext<'_>) -> Result { +/// Ok(ctx.get("status").await?.unwrap_or("unknown".to_string())) +/// } +/// +/// } +/// # async fn send_email_with_link(email: String, secret: String) -> Result<(), HandlerError> { +/// # Ok(()) +/// # } +/// +/// #[tokio::main] +/// async fn main() { +/// HttpServer::new(Endpoint::builder().bind(SignupWorkflowImpl.serve()).build()) +/// .listen_and_serve("0.0.0.0:9080".parse().unwrap()) +/// .await; +/// } +/// ``` +/// +/// ### The run handler +/// +/// Every workflow needs a `run` handler. +/// This handler has access to the same SDK features as Service and Virtual Object handlers. +/// In the example above, we use [`ctx.run`][crate::context::ContextSideEffects::run] to log the sending of the email in Restate and avoid re-execution on replay. +/// +/// +/// ## Shared handlers +/// +/// To define a shared handler, simply annotate the handler with the `#[shared]` annotation: +/// +/// ### Querying workflows +/// +/// Similar to Virtual Objects, you can retrieve the [K/V state][crate::context::ContextReadState] of workflows via the other handlers defined in the workflow definition, +/// In the example we expose the status of the workflow to external clients. +/// Every workflow execution can be seen as a new object, so the state is isolated to a single workflow execution. +/// The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state. +/// +/// ### Signaling workflows +/// +/// You can use Durable Promises to interact with your running workflows: to let the workflow block until an event occurs, or to send a signal / information into or out of a running workflow. +/// These promises are durable and distributed, meaning they survive crashes and can be resolved or rejected by any handler in the workflow. +/// +/// Do the following: +/// 1. Create a promise that is durable and distributed in the `run` handler, and wait for its completion. In the example, we wait on the promise `email.clicked`. +/// 2. Resolve or reject the promise in another handler in the workflow. This can be done at most one time. +/// In the example, the `click` handler gets called when the user clicks a link in an email and resolves the `email.clicked` promise. +/// +/// You can also use this pattern in reverse and let the `run` handler resolve promises that other handlers are waiting on. +/// For example, the `run` handler could resolve a promise when it finishes a step of the workflow, so that other handlers can request whether this step has been completed. +/// +/// ### Serving and registering workflows +/// +/// You serve workflows in the same way as Services and Virtual Objects. Have a look at the [Serving docs][crate::http_server]. +/// Make sure you [register the endpoint or Lambda handler](https://docs.restate.dev/operate/registration) in Restate before invoking it. +/// +/// **Tip: Workflows-as-code with Restate**: +/// [Check out some examples of workflows-as-code with Restate on the use case page](https://docs.restate.dev/use-cases/workflows). +/// +/// +/// ## Submitting workflows from a Restate service +/// [**Submit/query/signal**][crate::context::ContextClient]: +/// Call the workflow handlers in the same way as for Services and Virtual Objects. +/// You can only call the `run` handler (submit) once per workflow ID (here `"someone"`). +/// Check out the [Service Communication docs][crate::context::ContextClient] for more information. +/// +/// ## Submitting workflows over HTTP +/// [**Submit/query/signal**](https://docs.restate.dev/invoke/http#request-response-calls-over-http): +/// Call any handler of the workflow in the same way as for Services and Virtual Objects. +/// This returns the result of the handler once it has finished. +/// Add `/send` to the path for one-way calls. +/// You can only call the `run` handler once per workflow ID (here `"someone"`). +/// +/// ```shell +/// curl localhost:8080/SignupWorkflow/someone/run \ +/// -H 'content-type: application/json' \ +/// -d '"someone@restate.dev"' /// ``` +/// +/// [**Attach/peek**](https://docs.restate.dev/invoke/http#retrieve-result-of-invocations-and-workflows): +/// This lets you retrieve the result of a workflow or check if it's finished. +/// +/// ```shell +/// curl localhost:8080/restate/workflow/SignupWorkflow/someone/attach +/// curl localhost:8080/restate/workflow/SignupWorkflow/someone/output +/// ``` +/// +/// ## Inspecting workflows +/// +/// Have a look at the [introspection docs](https://docs.restate.dev/operate/introspection) on how to inspect workflows. +/// You can use this to for example: +/// - [Inspect the progress of a workflow by looking at the invocation journal](https://docs.restate.dev/operate/introspection#inspecting-the-invocation-journal) +/// - [Inspect the K/V state of a workflow](https://docs.restate.dev/operate/introspection#inspecting-application-state) +/// +/// +/// For more details, check the [`service` macro](macro@crate::service) documentation. pub use restate_sdk_macros::workflow; /// Prelude contains all the useful imports you need to get started with Restate. diff --git a/src/serde.rs b/src/serde.rs index 103f397..2967a0b 100644 --- a/src/serde.rs +++ b/src/serde.rs @@ -1,4 +1,12 @@ -//! Serialization/Deserialization traits and concrete implementations for handlers and state. +//! # Serialization +//! +//! Restate sends data over the network for storing state, journaling actions, awakeables, etc. +//! +//! Therefore, the types of the values that are stored, need to either: +//! - be a primitive type +//! - use a wrapper type [`Json`] for using [`serde-json`](https://serde.rs/) +//! - have the [`Serialize`] and [`Deserialize`] trait implemented +//! use bytes::Bytes; use std::convert::Infallible;