Skip to content

Commit e4a5856

Browse files
Adapt to test suite 3.0
1 parent 546b02a commit e4a5856

File tree

7 files changed

+295
-117
lines changed

7 files changed

+295
-117
lines changed

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ jobs:
105105
cache-to: type=gha,mode=max,scope=${{ github.workflow }}
106106

107107
- name: Run test tool
108-
uses: restatedev/sdk-test-suite@v2.4
108+
uses: restatedev/sdk-test-suite@v3.0
109109
with:
110110
restateContainerImage: ${{ inputs.restateCommit != '' && 'localhost/restatedev/restate-commit-download:latest' || (inputs.restateImage != '' && inputs.restateImage || 'ghcr.io/restatedev/restate:main') }}
111111
serviceContainerImage: "restatedev/rust-test-services"

test-services/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ $ podman build -f test-services/Dockerfile -t restatedev/rust-test-services .
99
To run (download the [sdk-test-suite](https://github.com/restatedev/sdk-test-suite) first):
1010

1111
```shell
12-
$ java -jar restate-sdk-test-suite.jar run restatedev/rust-test-services
12+
$ java -jar restate-sdk-test-suite.jar run localhost/restatedev/rust-test-services:latest
1313
```

test-services/exclusions.yaml

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
exclusions:
22
"alwaysSuspending":
3-
- "dev.restate.sdktesting.tests.AwaitTimeout"
3+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
4+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwakeableTimeoutCommand"
5+
- "dev.restate.sdktesting.tests.Combinators.firstSuccessfulCompletedAwakeable"
46
"default":
5-
- "dev.restate.sdktesting.tests.AwaitTimeout"
6-
- "dev.restate.sdktesting.tests.RawHandler"
7+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
8+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwakeableTimeoutCommand"
9+
- "dev.restate.sdktesting.tests.Combinators.firstSuccessfulCompletedAwakeable"
710
"singleThreadSinglePartition":
8-
- "dev.restate.sdktesting.tests.AwaitTimeout"
9-
- "dev.restate.sdktesting.tests.RawHandler"
11+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
12+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwakeableTimeoutCommand"
13+
- "dev.restate.sdktesting.tests.Combinators.firstSuccessfulCompletedAwakeable"
1014
"threeNodes":
11-
- "dev.restate.sdktesting.tests.AwaitTimeout"
12-
- "dev.restate.sdktesting.tests.RawHandler"
15+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
16+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwakeableTimeoutCommand"
17+
- "dev.restate.sdktesting.tests.Combinators.firstSuccessfulCompletedAwakeable"
1318
"threeNodesAlwaysSuspending":
14-
- "dev.restate.sdktesting.tests.AwaitTimeout"
19+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
20+
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwakeableTimeoutCommand"
21+
- "dev.restate.sdktesting.tests.Combinators.firstSuccessfulCompletedAwakeable"

test-services/src/main.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod map_object;
99
mod non_deterministic;
1010
mod proxy;
1111
mod test_utils_service;
12+
mod virtual_object_command_interpreter;
1213

1314
use restate_sdk::prelude::{Endpoint, HttpServer};
1415
use std::env;
@@ -76,6 +77,13 @@ async fn main() {
7677
test_utils_service::TestUtilsServiceImpl,
7778
))
7879
}
80+
if services == "*" || services.contains("VirtualObjectCommandInterpreter") {
81+
builder = builder.bind(
82+
virtual_object_command_interpreter::VirtualObjectCommandInterpreter::serve(
83+
virtual_object_command_interpreter::VirtualObjectCommandInterpreterImpl,
84+
),
85+
)
86+
}
7987

8088
if let Ok(key) = env::var("E2E_REQUEST_SIGNING_ENV") {
8189
builder = builder.identity_key(&key).unwrap()

test-services/src/proxy.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub(crate) struct ProxyRequest {
1111
service_name: String,
1212
virtual_object_key: Option<String>,
1313
handler_name: String,
14+
idempotency_key: Option<String>,
1415
message: Vec<u8>,
1516
delay_millis: Option<u64>,
1617
}
@@ -59,19 +60,22 @@ impl Proxy for ProxyImpl {
5960
ctx: Context<'_>,
6061
Json(req): Json<ProxyRequest>,
6162
) -> HandlerResult<Json<Vec<u8>>> {
62-
Ok(ctx
63-
.request::<Vec<u8>, Vec<u8>>(req.to_target(), req.message)
64-
.call()
65-
.await?
66-
.into())
63+
let mut request = ctx.request::<Vec<u8>, Vec<u8>>(req.to_target(), req.message);
64+
if let Some(idempotency_key) = req.idempotency_key {
65+
request = request.idempotency_key(idempotency_key);
66+
}
67+
Ok(request.call().await?.into())
6768
}
6869

6970
async fn one_way_call(
7071
&self,
7172
ctx: Context<'_>,
7273
Json(req): Json<ProxyRequest>,
7374
) -> HandlerResult<String> {
74-
let request = ctx.request::<_, ()>(req.to_target(), req.message);
75+
let mut request = ctx.request::<_, ()>(req.to_target(), req.message);
76+
if let Some(idempotency_key) = req.idempotency_key {
77+
request = request.idempotency_key(idempotency_key);
78+
}
7579

7680
let invocation_id = if let Some(delay_millis) = req.delay_millis {
7781
request
@@ -93,8 +97,11 @@ impl Proxy for ProxyImpl {
9397
let mut futures: Vec<BoxFuture<'_, Result<Vec<u8>, TerminalError>>> = vec![];
9498

9599
for req in requests {
96-
let restate_req =
100+
let mut restate_req =
97101
ctx.request::<_, Vec<u8>>(req.proxy_request.to_target(), req.proxy_request.message);
102+
if let Some(idempotency_key) = req.proxy_request.idempotency_key {
103+
restate_req = restate_req.idempotency_key(idempotency_key);
104+
}
98105
if req.one_way_call {
99106
if let Some(delay_millis) = req.proxy_request.delay_millis {
100107
restate_req.send_after(Duration::from_millis(delay_millis));
Lines changed: 7 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,29 @@
1-
use crate::awakeable_holder;
2-
use crate::list_object::ListObjectClient;
31
use futures::future::BoxFuture;
42
use futures::FutureExt;
53
use restate_sdk::prelude::*;
6-
use serde::{Deserialize, Serialize};
74
use std::collections::HashMap;
5+
use std::convert::Infallible;
86
use std::sync::atomic::{AtomicU8, Ordering};
97
use std::sync::Arc;
108
use std::time::Duration;
119

12-
#[derive(Serialize, Deserialize)]
13-
#[serde(rename_all = "camelCase")]
14-
pub(crate) struct CreateAwakeableAndAwaitItRequest {
15-
awakeable_key: String,
16-
await_timeout: Option<u64>,
17-
}
18-
19-
#[derive(Serialize, Deserialize)]
20-
#[serde(tag = "type")]
21-
#[serde(rename_all_fields = "camelCase")]
22-
pub(crate) enum CreateAwakeableAndAwaitItResponse {
23-
#[serde(rename = "timeout")]
24-
Timeout,
25-
#[serde(rename = "result")]
26-
Result { value: String },
27-
}
28-
29-
#[derive(Serialize, Deserialize)]
30-
#[serde(rename_all = "camelCase")]
31-
pub(crate) struct InterpretRequest {
32-
list_name: String,
33-
commands: Vec<InterpretCommand>,
34-
}
35-
36-
#[derive(Serialize, Deserialize)]
37-
#[serde(tag = "type")]
38-
#[serde(rename_all_fields = "camelCase")]
39-
pub(crate) enum InterpretCommand {
40-
#[serde(rename = "createAwakeableAndAwaitIt")]
41-
CreateAwakeableAndAwaitIt { awakeable_key: String },
42-
#[serde(rename = "getEnvVariable")]
43-
GetEnvVariable { env_name: String },
44-
}
45-
4610
#[restate_sdk::service]
4711
#[name = "TestUtilsService"]
4812
pub(crate) trait TestUtilsService {
4913
#[name = "echo"]
5014
async fn echo(input: String) -> HandlerResult<String>;
5115
#[name = "uppercaseEcho"]
5216
async fn uppercase_echo(input: String) -> HandlerResult<String>;
17+
#[name = "rawEcho"]
18+
async fn raw_echo(input: Vec<u8>) -> Result<Vec<u8>, Infallible>;
5319
#[name = "echoHeaders"]
5420
async fn echo_headers() -> HandlerResult<Json<HashMap<String, String>>>;
55-
#[name = "createAwakeableAndAwaitIt"]
56-
async fn create_awakeable_and_await_it(
57-
req: Json<CreateAwakeableAndAwaitItRequest>,
58-
) -> HandlerResult<Json<CreateAwakeableAndAwaitItResponse>>;
5921
#[name = "sleepConcurrently"]
6022
async fn sleep_concurrently(millis_durations: Json<Vec<u64>>) -> HandlerResult<()>;
6123
#[name = "countExecutedSideEffects"]
6224
async fn count_executed_side_effects(increments: u32) -> HandlerResult<u32>;
63-
#[name = "getEnvVariable"]
64-
async fn get_env_variable(env: String) -> HandlerResult<String>;
6525
#[name = "cancelInvocation"]
6626
async fn cancel_invocation(invocation_id: String) -> Result<(), TerminalError>;
67-
#[name = "interpretCommands"]
68-
async fn interpret_commands(req: Json<InterpretRequest>) -> HandlerResult<()>;
6927
}
7028

7129
pub(crate) struct TestUtilsServiceImpl;
@@ -79,6 +37,10 @@ impl TestUtilsService for TestUtilsServiceImpl {
7937
Ok(input.to_ascii_uppercase())
8038
}
8139

40+
async fn raw_echo(&self, _: Context<'_>, input: Vec<u8>) -> Result<Vec<u8>, Infallible> {
41+
Ok(input)
42+
}
43+
8244
async fn echo_headers(
8345
&self,
8446
context: Context<'_>,
@@ -94,27 +56,6 @@ impl TestUtilsService for TestUtilsServiceImpl {
9456
Ok(headers.into())
9557
}
9658

97-
async fn create_awakeable_and_await_it(
98-
&self,
99-
context: Context<'_>,
100-
Json(req): Json<CreateAwakeableAndAwaitItRequest>,
101-
) -> HandlerResult<Json<CreateAwakeableAndAwaitItResponse>> {
102-
if req.await_timeout.is_some() {
103-
unimplemented!("await timeout is not yet implemented");
104-
}
105-
106-
let (awk_id, awakeable) = context.awakeable::<String>();
107-
108-
context
109-
.object_client::<awakeable_holder::AwakeableHolderClient>(req.awakeable_key)
110-
.hold(awk_id)
111-
.call()
112-
.await?;
113-
let value = awakeable.await?;
114-
115-
Ok(CreateAwakeableAndAwaitItResponse::Result { value }.into())
116-
}
117-
11859
async fn sleep_concurrently(
11960
&self,
12061
context: Context<'_>,
@@ -153,10 +94,6 @@ impl TestUtilsService for TestUtilsServiceImpl {
15394
Ok(counter.load(Ordering::SeqCst) as u32)
15495
}
15596

156-
async fn get_env_variable(&self, _: Context<'_>, env: String) -> HandlerResult<String> {
157-
Ok(std::env::var(env).ok().unwrap_or_default())
158-
}
159-
16097
async fn cancel_invocation(
16198
&self,
16299
ctx: Context<'_>,
@@ -165,34 +102,4 @@ impl TestUtilsService for TestUtilsServiceImpl {
165102
ctx.invocation_handle(invocation_id).cancel().await?;
166103
Ok(())
167104
}
168-
169-
async fn interpret_commands(
170-
&self,
171-
context: Context<'_>,
172-
Json(req): Json<InterpretRequest>,
173-
) -> HandlerResult<()> {
174-
let list_client = context.object_client::<ListObjectClient>(req.list_name);
175-
176-
for cmd in req.commands {
177-
match cmd {
178-
InterpretCommand::CreateAwakeableAndAwaitIt { awakeable_key } => {
179-
let (awk_id, awakeable) = context.awakeable::<String>();
180-
context
181-
.object_client::<awakeable_holder::AwakeableHolderClient>(awakeable_key)
182-
.hold(awk_id)
183-
.call()
184-
.await?;
185-
let value = awakeable.await?;
186-
list_client.append(value).send();
187-
}
188-
InterpretCommand::GetEnvVariable { env_name } => {
189-
list_client
190-
.append(std::env::var(env_name).ok().unwrap_or_default())
191-
.send();
192-
}
193-
}
194-
}
195-
196-
Ok(())
197-
}
198105
}

0 commit comments

Comments
 (0)