Skip to content

Commit c2b9046

Browse files
Service Protocol V4 (#46)
Upgrade the Rust SDK to use the latest Service Protocol V4. This includes few new features, such as: * Get call/send invocation id * Cancel invocation by invocation id * The method `send_with_delay` was renamed to `send_after`
1 parent 9151c75 commit c2b9046

21 files changed

+1051
-545
lines changed

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ jobs:
105105
cache-to: type=gha,mode=max,scope=${{ github.workflow }}
106106

107107
- name: Run test tool
108-
uses: restatedev/sdk-test-suite@v2.4
108+
uses: restatedev/sdk-test-suite@v3.0
109109
with:
110110
restateContainerImage: ${{ inputs.restateCommit != '' && 'localhost/restatedev/restate-commit-download:latest' || (inputs.restateImage != '' && inputs.restateImage || 'ghcr.io/restatedev/restate:main') }}
111111
serviceContainerImage: "restatedev/rust-test-services"

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ default = ["http_server", "rand", "uuid"]
1212
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
1313
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
1414

15+
1516
[dependencies]
1617
bytes = "1.6.1"
1718
futures = "0.3"
@@ -23,8 +24,7 @@ pin-project-lite = "0.2"
2324
rand = { version = "0.8.5", optional = true }
2425
regress = "0.10"
2526
restate-sdk-macros = { version = "0.3.2", path = "macros" }
26-
restate-sdk-shared-core = "0.1.0"
27-
sha2 = "=0.11.0-pre.3"
27+
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", branch = "main", features = ["request_identity", "sha2_random_seed", "http"] }
2828
serde = "1.0"
2929
serde_json = "1.0"
3030
thiserror = "1.0.63"

README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,11 @@ The Rust SDK is currently in active development, and might break across releases
121121

122122
The compatibility with Restate is described in the following table:
123123

124-
| Restate Server\sdk-rust | 0.0/0.1/0.2 | 0.3 |
125-
|-------------------------|-------------|-----|
126-
| 1.0 |||
127-
| 1.1 |||
124+
| Restate Server\sdk-rust | 0.0/0.1/0.2 | 0.3 | 0.4 |
125+
|-------------------------|-------------|-----|-----|
126+
| 1.0 ||||
127+
| 1.1 ||||
128+
| 1.2 ||||
128129

129130
## Contributing
130131

examples/cron.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl PeriodicTaskImpl {
7777
.object_client::<PeriodicTaskClient>(context.key())
7878
.run()
7979
// And send with a delay
80-
.send_with_delay(Duration::from_secs(10));
80+
.send_after(Duration::from_secs(10));
8181
}
8282
}
8383

src/context/mod.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::time::Duration;
88

99
mod request;
1010
mod run;
11-
pub use request::{Request, RequestTarget};
11+
pub use request::{CallFuture, InvocationHandle, Request, RequestTarget};
1212
pub use run::{RunClosure, RunFuture, RunRetryPolicy};
1313

1414
pub type HeaderMap = http::HeaderMap<String>;
@@ -370,20 +370,20 @@ impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
370370
/// // To a Service:
371371
/// ctx.service_client::<MyServiceClient>()
372372
/// .my_handler(String::from("Hi!"))
373-
/// .send_with_delay(Duration::from_millis(5000));
373+
/// .send_after(Duration::from_millis(5000));
374374
///
375375
/// // To a Virtual Object:
376376
/// ctx.object_client::<MyVirtualObjectClient>("Mary")
377377
/// .my_handler(String::from("Hi!"))
378-
/// .send_with_delay(Duration::from_millis(5000));
378+
/// .send_after(Duration::from_millis(5000));
379379
///
380380
/// // To a Workflow:
381381
/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
382382
/// .run(String::from("Hi!"))
383-
/// .send_with_delay(Duration::from_millis(5000));
383+
/// .send_after(Duration::from_millis(5000));
384384
/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
385385
/// .interact_with_workflow()
386-
/// .send_with_delay(Duration::from_millis(5000));
386+
/// .send_after(Duration::from_millis(5000));
387387
/// # Ok(())
388388
/// # }
389389
/// ```
@@ -433,6 +433,11 @@ pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
433433
Request::new(self.inner_context(), request_target, req)
434434
}
435435

436+
/// Create an [`InvocationHandle`] from an invocation id.
437+
fn invocation_handle(&self, invocation_id: String) -> impl InvocationHandle + 'ctx {
438+
self.inner_context().invocation_handle(invocation_id)
439+
}
440+
436441
/// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
437442
///
438443
/// ```rust,no_run
@@ -454,7 +459,7 @@ pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
454459
/// client.handle().send();
455460
///
456461
/// // Schedule the request to be executed later
457-
/// client.handle().send_with_delay(Duration::from_secs(60));
462+
/// client.handle().send_after(Duration::from_secs(60));
458463
/// # }
459464
/// ```
460465
fn service_client<C>(&self) -> C
@@ -485,7 +490,7 @@ pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
485490
/// client.handle().send();
486491
///
487492
/// // Schedule the request to be executed later
488-
/// client.handle().send_with_delay(Duration::from_secs(60));
493+
/// client.handle().send_after(Duration::from_secs(60));
489494
/// # }
490495
/// ```
491496
fn object_client<C>(&self, key: impl Into<String>) -> C
@@ -516,7 +521,7 @@ pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
516521
/// client.handle().send();
517522
///
518523
/// // Schedule the request to be executed later
519-
/// client.handle().send_with_delay(Duration::from_secs(60));
524+
/// client.handle().send_after(Duration::from_secs(60));
520525
/// # }
521526
/// ```
522527
fn workflow_client<C>(&self, key: impl Into<String>) -> C
@@ -627,7 +632,7 @@ pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
627632
&self,
628633
) -> (
629634
String,
630-
impl Future<Output = Result<T, TerminalError>> + Send + Sync + 'ctx,
635+
impl Future<Output = Result<T, TerminalError>> + Send + 'ctx,
631636
) {
632637
self.inner_context().awakeable()
633638
}

src/context/request.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl fmt::Display for RequestTarget {
7272
pub struct Request<'a, Req, Res = ()> {
7373
ctx: &'a ContextInternal,
7474
request_target: RequestTarget,
75+
idempotency_key: Option<String>,
7576
req: Req,
7677
res: PhantomData<Res>,
7778
}
@@ -81,33 +82,54 @@ impl<'a, Req, Res> Request<'a, Req, Res> {
8182
Self {
8283
ctx,
8384
request_target,
85+
idempotency_key: None,
8486
req,
8587
res: PhantomData,
8688
}
8789
}
8890

91+
/// Add idempotency key to the request
92+
pub fn idempotency_key(mut self, idempotency_key: impl Into<String>) -> Self {
93+
self.idempotency_key = Some(idempotency_key.into());
94+
self
95+
}
96+
8997
/// Call a service. This returns a future encapsulating the response.
90-
pub fn call(self) -> impl Future<Output = Result<Res, TerminalError>> + Send
98+
pub fn call(self) -> impl CallFuture<Result<Res, TerminalError>> + Send
9199
where
92100
Req: Serialize + 'static,
93101
Res: Deserialize + 'static,
94102
{
95-
self.ctx.call(self.request_target, self.req)
103+
self.ctx
104+
.call(self.request_target, self.idempotency_key, self.req)
96105
}
97106

98107
/// Send the request to the service, without waiting for the response.
99-
pub fn send(self)
108+
pub fn send(self) -> impl InvocationHandle
100109
where
101110
Req: Serialize + 'static,
102111
{
103-
self.ctx.send(self.request_target, self.req, None)
112+
self.ctx
113+
.send(self.request_target, self.idempotency_key, self.req, None)
104114
}
105115

106116
/// Schedule the request to the service, without waiting for the response.
107-
pub fn send_with_delay(self, duration: Duration)
117+
pub fn send_after(self, delay: Duration) -> impl InvocationHandle
108118
where
109119
Req: Serialize + 'static,
110120
{
111-
self.ctx.send(self.request_target, self.req, Some(duration))
121+
self.ctx.send(
122+
self.request_target,
123+
self.idempotency_key,
124+
self.req,
125+
Some(delay),
126+
)
112127
}
113128
}
129+
130+
pub trait InvocationHandle {
131+
fn invocation_id(&self) -> impl Future<Output = Result<String, TerminalError>> + Send;
132+
fn cancel(&self) -> impl Future<Output = Result<(), TerminalError>> + Send;
133+
}
134+
135+
pub trait CallFuture<O>: Future<Output = O> + InvocationHandle {}

0 commit comments

Comments
 (0)