From 0521ff2689023e12b53e10e1439d71c93fec3594 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 10 Feb 2025 12:28:52 -0500 Subject: [PATCH 01/12] initial testcontinaters framework --- Cargo.toml | 3 +- restate-test-utils/Cargo.toml | 22 ++++ restate-test-utils/src/lib.rs | 1 + restate-test-utils/src/test_utils.rs | 174 +++++++++++++++++++++++++++ tests/test_container.rs | 65 ++++++++++ 5 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 restate-test-utils/Cargo.toml create mode 100644 restate-test-utils/src/lib.rs create mode 100644 restate-test-utils/src/test_utils.rs create mode 100644 tests/test_container.rs diff --git a/Cargo.toml b/Cargo.toml index ae96dcb..5385039 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ tracing-subscriber = "0.3" trybuild = "1.0" reqwest = { version = "0.12", features = ["json"] } rand = "0.8.5" +restate-test-utils = { path = "restate-test-utils" } [build-dependencies] jsonptr = "0.5.1" @@ -47,4 +48,4 @@ syn = "2.0" typify = { version = "0.1.0" } [workspace] -members = ["macros", "test-services"] +members = ["macros", "test-services", "restate-test-utils"] diff --git a/restate-test-utils/Cargo.toml b/restate-test-utils/Cargo.toml new file mode 100644 index 0000000..46ec839 --- /dev/null +++ b/restate-test-utils/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "restate-test-utils" +version = "0.3.2" +edition = "2021" +description = "Test Utilities for Restate SDK for Rust" +license = "MIT" +repository = "https://github.com/restatedev/sdk-rust/test-utils" +rust-version = "1.76.0" + + +[dependencies] +futures = "0.3.31" +http = "1.2.0" +nu-ansi-term = "0.50.1" +reqwest = {version= "0.12.12", features = ["json"]} +restate-admin-rest-model = { git = "https://github.com/restatedev/restate/", branch = "release/1.1.6", version = "1.1.6" } +restate-sdk = { version = "0.3.2", path = ".." } +restate-sdk-shared-core = "0.2.0" +serde = "1.0.217" +serde_json = "1.0.138" +testcontainers = "0.23.1" +tokio = "1.43.0" diff --git a/restate-test-utils/src/lib.rs b/restate-test-utils/src/lib.rs new file mode 100644 index 0000000..681d26e --- /dev/null +++ b/restate-test-utils/src/lib.rs @@ -0,0 +1 @@ +pub mod test_utils; diff --git a/restate-test-utils/src/test_utils.rs b/restate-test-utils/src/test_utils.rs new file mode 100644 index 0000000..7f4e5b6 --- /dev/null +++ b/restate-test-utils/src/test_utils.rs @@ -0,0 +1,174 @@ + +use restate_admin_rest_model::deployments::RegisterDeploymentRequest; +use testcontainers::{core::{IntoContainerPort, WaitFor}, runners::AsyncRunner, ContainerAsync, ContainerRequest, GenericImage, ImageExt}; +use serde::{Serialize, Deserialize}; +use restate_sdk::{discovery::Service, errors::HandlerError}; +use std::time::Duration; + +#[derive(Serialize, Deserialize, Debug)] +struct VersionResponse { + version:String, + min_admin_api_version:u32, + max_admin_api_version:u32 +} + + +pub struct TestContainer { + container:ContainerAsync +} + +impl TestContainer { + + pub async fn new() -> Result { + + let image = GenericImage::new("docker.io/restatedev/restate", "latest") + .with_exposed_port(9070.tcp()) + .with_exposed_port(8080.tcp()) + .with_wait_for(WaitFor::message_on_stdout("Ingress HTTP listening")); + + // have to expose entire host network because testcontainer-rs doesn't implement selective SSH port forward from host + // see https://github.com/testcontainers/testcontainers-rs/issues/535 + let container = ContainerRequest::from(image) + .with_host("host.docker.internal" , testcontainers::core::Host::HostGateway) + .start().await?; + // .await?; + + let strout = container.stdout_to_vec().await.unwrap(); + println!("{}", String::from_utf8(strout).unwrap()); + + let host = container.get_host().await?; + let ports = container.ports().await?; + + let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); + let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); + + let admin_url = format!("http://{}:{}/version", host, admin_port); + + println!("container host: {} admin port: {} ingress port: {}", + nu_ansi_term::Style::new().bold().paint(host.to_string()), + nu_ansi_term::Style::new().bold().paint(admin_port.to_string()), + nu_ansi_term::Style::new().bold().paint(ingress_port.to_string())); + + println!("admin url: {}", + nu_ansi_term::Style::new().bold().paint(admin_url.as_str())); + + let body = reqwest::get(admin_url) + .await? + .json::() + .await?; + + println!("body:\n{:?}", body); + + Ok(TestContainer {container}) + } + + pub async fn register(&self, server_port:u16) -> Result<(), HandlerError> { + + let host = self.container.get_host().await?; + let ports = self.container.ports().await?; + + let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); + let server_url = format!("http://localhost:{}", admin_port); + + println!("health check: {}/health", server_url); + + let client = reqwest::Client::builder().http2_prior_knowledge().build().unwrap(); + + // wait for server to respond + while let Err(err) = client.get(format!("{}/health", server_url).to_string()) + .header("accept", "application/vnd.restate.endpointmanifest.v1+json") + .send().await { + tokio::time::sleep(Duration::from_secs(1)).await; + + println!("health check: {:?}", err); + } + + let ok = client.get(format!("{}/health", server_url).to_string()) + .header("accept", "application/vnd.restate.endpointmanifest.v1+json") + .send().await; + + println!("health check: {:?}", ok); + + println!("host started"); + + let uri = http::Uri::builder() + .scheme("http") + .authority(format!("host.docker.internal:{}", server_port)) + .path_and_query("/") + .build().unwrap(); + + let register_url = format!("http://{}:{}/deployments", host, admin_port); + println!("uri_str: {:?}", uri); + + let register_payload = RegisterDeploymentRequest::Http { uri, additional_headers: None, use_http_11: false, force: false, dry_run: false }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } + //let register_payload = format!("{{\"uri\": \"host.docker.internal:{}/\", \"force\": true}}", server_port); + + println!("register payload: \n{:?}", register_payload); + + // wait for registration + let registed = client.post(register_url.to_string()) + .json(®ister_payload) + .send().await; + + assert!(registed.is_ok()); + + let response =registed.unwrap(); + assert!(response.status().is_success()); + + println!("register response: {}\n{}", nu_ansi_term::Style::new().bold().paint(response.status().as_u16().to_string()), response.text().await.unwrap()); + println!("{}", nu_ansi_term::Style::new().bold().paint("host registered")); + + return Ok(()); + } + + pub async fn delay(milliseconds:u64) { + tokio::time::sleep(Duration::from_millis(milliseconds)).await; + } + + pub async fn invoke(&self, service:Service, handler:String) -> Result<(), HandlerError> { + + let host = self.container.get_host().await?; + let ports = self.container.ports().await?; + + let client = reqwest::Client::builder().http2_prior_knowledge().build().unwrap(); + + let service_name:String = service.name.to_string(); + let handler_names:Vec = service.handlers.iter().map(|h|h.name.to_string()).collect(); + + assert!(handler_names.contains(&handler)); + + let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); + let admin_host = format!("http://{}:{}", host, admin_port); + + let service_discovery_url = format!("{}/services/{}/handlers", admin_host, service_name).to_string(); + + // TODO query admin API to verify registered handlers matches service camelCase descriptor + let discovery_response = client.get(service_discovery_url) + .send().await; + + let resposne = discovery_response.unwrap(); + println!("discovery response: {:?}", resposne); + println!("discovery response: {:?}", resposne.text().await.unwrap()); + + let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); + let ingress_host = format!("http://{}:{}", host, ingress_port); + + let ingress_handler_url = format!("{}/{}/{}", ingress_host, service_name, handler).to_string(); + println!("ingress url: {}", ingress_handler_url); + + let ingress_response = client.post(ingress_handler_url) + .send().await; + + let resposne = ingress_response.unwrap(); + println!("ingress response: {:?}", resposne); + println!("ingress response: {:?}", resposne.text().await.unwrap()); + + return Ok(()); + + } +} + +#[tokio::test] +async fn boot_test_container() { + let _test_comtainer = crate::test_utils::TestContainer::new().await.unwrap(); +} diff --git a/tests/test_container.rs b/tests/test_container.rs new file mode 100644 index 0000000..b42621b --- /dev/null +++ b/tests/test_container.rs @@ -0,0 +1,65 @@ +use restate_test_utils::test_utils::TestContainer; +use restate_sdk::{discovery::Service, prelude::*}; + +// Should compile +#[restate_sdk::service] +trait MyService { + async fn my_handler() -> HandlerResult; +} + +#[restate_sdk::object] +trait MyObject { + async fn my_handler(input: String) -> HandlerResult; + #[shared] + async fn my_shared_handler(input: String) -> HandlerResult; +} + +#[restate_sdk::workflow] +trait MyWorkflow { + async fn my_handler(input: String) -> HandlerResult; + #[shared] + async fn my_shared_handler(input: String) -> HandlerResult; +} + + +struct MyServiceImpl; + +impl MyService for MyServiceImpl { + async fn my_handler(&self, _: Context<'_>) -> HandlerResult { + let result = "hello!"; + Ok(result.to_string()) + } +} + +#[tokio::test] +async fn test_container_image() { + + let test_container = TestContainer::new().await.unwrap(); + + // uses higher port number to avoid collisions + // with non-test instances running locally + let host_port:u16 = 19080; + let host_address = format!("0.0.0.0:{}", host_port); + + println!("starting host"); + // boot restate server + tokio::spawn(async move { + HttpServer::new( + Endpoint::builder() + .bind(MyServiceImpl.serve()) + .build(), + ).listen_and_serve(host_address.parse().unwrap()).await; + }); + + use restate_sdk::service::Discoverable; + let my_service:Service = ServeMyService::::discover(); + + let registered = test_container.register(host_port).await; + + TestContainer::delay(1000).await; + + let response = test_container.invoke(my_service, "my_handler".to_string()).await; + + assert!(registered.is_ok()); + +} From b46ad689e7f2c4e092a244c34eb38d9c3047c406 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Tue, 11 Feb 2025 10:56:39 -0500 Subject: [PATCH 02/12] updated test framework --- restate-test-utils/Cargo.toml | 1 - restate-test-utils/src/test_utils.rs | 193 ++++++++++++++++----------- tests/test_container.rs | 35 ++--- 3 files changed, 129 insertions(+), 100 deletions(-) diff --git a/restate-test-utils/Cargo.toml b/restate-test-utils/Cargo.toml index 46ec839..bef53bb 100644 --- a/restate-test-utils/Cargo.toml +++ b/restate-test-utils/Cargo.toml @@ -13,7 +13,6 @@ futures = "0.3.31" http = "1.2.0" nu-ansi-term = "0.50.1" reqwest = {version= "0.12.12", features = ["json"]} -restate-admin-rest-model = { git = "https://github.com/restatedev/restate/", branch = "release/1.1.6", version = "1.1.6" } restate-sdk = { version = "0.3.2", path = ".." } restate-sdk-shared-core = "0.2.0" serde = "1.0.217" diff --git a/restate-test-utils/src/test_utils.rs b/restate-test-utils/src/test_utils.rs index 7f4e5b6..7d5b010 100644 --- a/restate-test-utils/src/test_utils.rs +++ b/restate-test-utils/src/test_utils.rs @@ -1,10 +1,29 @@ - -use restate_admin_rest_model::deployments::RegisterDeploymentRequest; +use nu_ansi_term::Style; +use reqwest::Response; use testcontainers::{core::{IntoContainerPort, WaitFor}, runners::AsyncRunner, ContainerAsync, ContainerRequest, GenericImage, ImageExt}; use serde::{Serialize, Deserialize}; -use restate_sdk::{discovery::Service, errors::HandlerError}; +use restate_sdk::{discovery::Service, errors::HandlerError, prelude::{Endpoint, HttpServer}}; +use tokio::{io::{self, AsyncWriteExt}, task::{self, JoinHandle}}; use std::time::Duration; +// addapted from from restate-admin-rest-model crate version 1.1.6 +#[derive(Serialize, Deserialize, Debug)] +pub struct RegisterDeploymentRequestHttp { + uri: String, + additional_headers:Option>, + use_http_11: bool, + force: bool, + dry_run: bool +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct RegisterDeploymentRequestLambda { + arn: String, + assume_role_arn: Option, + force: bool, + dry_run: bool, +} + #[derive(Serialize, Deserialize, Debug)] struct VersionResponse { version:String, @@ -12,16 +31,20 @@ struct VersionResponse { max_admin_api_version:u32 } - pub struct TestContainer { - container:ContainerAsync + container:ContainerAsync, + stdout_logging:JoinHandle<()>, + stderr_logging:JoinHandle<()>, + endpoint:Option> } + impl TestContainer { - pub async fn new() -> Result { + //"docker.io/restatedev/restate", "latest" + pub async fn new(image:String, version:String) -> Result { - let image = GenericImage::new("docker.io/restatedev/restate", "latest") + let image = GenericImage::new(image, version) .with_exposed_port(9070.tcp()) .with_exposed_port(8080.tcp()) .with_wait_for(WaitFor::message_on_stdout("Ingress HTTP listening")); @@ -30,39 +53,65 @@ impl TestContainer { // see https://github.com/testcontainers/testcontainers-rs/issues/535 let container = ContainerRequest::from(image) .with_host("host.docker.internal" , testcontainers::core::Host::HostGateway) - .start().await?; - // .await?; + .start() + .await?; - let strout = container.stdout_to_vec().await.unwrap(); - println!("{}", String::from_utf8(strout).unwrap()); + let mut container_stdout = container.stdout(true); + // Spawn a task to copy data from the AsyncBufRead to stdout + let stdout_logging = task::spawn(async move { + let mut stdout = io::stdout(); + if let Err(e) = io::copy(&mut container_stdout, &mut stdout).await { + eprintln!("Error copying data: {}", e); + } + }); + + let mut container_stderr = container.stderr(true); + // Spawn a task to copy data from the AsyncBufRead to stderr + let stderr_logging = task::spawn(async move { + let mut stderr = io::stderr(); + if let Err(e) = io::copy(&mut container_stderr, &mut stderr).await { + eprintln!("Error copying data: {}", e); + } + }); let host = container.get_host().await?; let ports = container.ports().await?; let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); - let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); let admin_url = format!("http://{}:{}/version", host, admin_port); - - println!("container host: {} admin port: {} ingress port: {}", - nu_ansi_term::Style::new().bold().paint(host.to_string()), - nu_ansi_term::Style::new().bold().paint(admin_port.to_string()), - nu_ansi_term::Style::new().bold().paint(ingress_port.to_string())); - - println!("admin url: {}", - nu_ansi_term::Style::new().bold().paint(admin_url.as_str())); - - let body = reqwest::get(admin_url) + reqwest::get(admin_url) .await? .json::() .await?; - println!("body:\n{:?}", body); + Ok(TestContainer {container, stdout_logging, stderr_logging, endpoint:None}) + } + + pub async fn serve_endpoint(&mut self, endpoint:Endpoint) { + + println!("\n\n{}\n\n", Style::new().bold().paint(format!("starting enpoint server..."))); + // uses higher port number to avoid collisions + // with non-test instances running locally + let host_port:u16 = 19080; + let host_address = format!("0.0.0.0:{}", host_port); + + // boot restate server + let endpoint = tokio::spawn(async move { + HttpServer::new(endpoint) + .listen_and_serve(host_address.parse().unwrap()).await; + }); + + let registered = self.register(host_port).await; - Ok(TestContainer {container}) + assert!(registered.is_ok()); + + self.endpoint = Some(endpoint); } - pub async fn register(&self, server_port:u16) -> Result<(), HandlerError> { + async fn register(&self, server_port:u16) -> Result<(), HandlerError> { + + println!("\n\n{}\n\n", Style::new().bold().paint(format!("registering server..."))); let host = self.container.get_host().await?; let ports = self.container.ports().await?; @@ -70,53 +119,37 @@ impl TestContainer { let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); let server_url = format!("http://localhost:{}", admin_port); - println!("health check: {}/health", server_url); - - let client = reqwest::Client::builder().http2_prior_knowledge().build().unwrap(); + let client = reqwest::Client::builder().http2_prior_knowledge().build()?; // wait for server to respond - while let Err(err) = client.get(format!("{}/health", server_url).to_string()) + while let Err(_) = client.get(format!("{}/health", server_url).to_string()) .header("accept", "application/vnd.restate.endpointmanifest.v1+json") .send().await { tokio::time::sleep(Duration::from_secs(1)).await; - - println!("health check: {:?}", err); } - let ok = client.get(format!("{}/health", server_url).to_string()) - .header("accept", "application/vnd.restate.endpointmanifest.v1+json") - .send().await; - - println!("health check: {:?}", ok); - - println!("host started"); - - let uri = http::Uri::builder() - .scheme("http") - .authority(format!("host.docker.internal:{}", server_port)) - .path_and_query("/") - .build().unwrap(); - - let register_url = format!("http://{}:{}/deployments", host, admin_port); - println!("uri_str: {:?}", uri); + client.get(format!("{}/health", server_url).to_string()) + .header("accept", "application/vnd.restate.endpointmanifest.v1+json") + .send().await?; - let register_payload = RegisterDeploymentRequest::Http { uri, additional_headers: None, use_http_11: false, force: false, dry_run: false }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } - //let register_payload = format!("{{\"uri\": \"host.docker.internal:{}/\", \"force\": true}}", server_port); + let deployment_uri:String = format!("http://host.docker.internal:{}/", server_port); + let deployment_payload = RegisterDeploymentRequestHttp { + uri:deployment_uri, + additional_headers:None, + use_http_11: false, + force: false, + dry_run: false }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } - println!("register payload: \n{:?}", register_payload); + let register_admin_url = format!("http://{}:{}/deployments", host, admin_port); - // wait for registration - let registed = client.post(register_url.to_string()) - .json(®ister_payload) - .send().await; + client.post(register_admin_url.to_string()) + .json(&deployment_payload) + .send().await?; - assert!(registed.is_ok()); - - let response =registed.unwrap(); - assert!(response.status().is_success()); + let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); + let ingress_host = format!("http://{}:{}", host, ingress_port); - println!("register response: {}\n{}", nu_ansi_term::Style::new().bold().paint(response.status().as_u16().to_string()), response.text().await.unwrap()); - println!("{}", nu_ansi_term::Style::new().bold().paint("host registered")); + println!("\n\n{}\n\n", Style::new().bold().paint(format!("ingress url: {}", ingress_host, ))); return Ok(()); } @@ -125,7 +158,7 @@ impl TestContainer { tokio::time::sleep(Duration::from_millis(milliseconds)).await; } - pub async fn invoke(&self, service:Service, handler:String) -> Result<(), HandlerError> { + pub async fn invoke(&self, service:Service, handler:String) -> Result { let host = self.container.get_host().await?; let ports = self.container.ports().await?; @@ -137,38 +170,42 @@ impl TestContainer { assert!(handler_names.contains(&handler)); + println!("\n\n{}\n\n", Style::new().bold().paint(format!("invoking {}/{}", service_name, handler))); + let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); let admin_host = format!("http://{}:{}", host, admin_port); let service_discovery_url = format!("{}/services/{}/handlers", admin_host, service_name).to_string(); - // TODO query admin API to verify registered handlers matches service camelCase descriptor - let discovery_response = client.get(service_discovery_url) - .send().await; + client.get(service_discovery_url) + .send().await?; - let resposne = discovery_response.unwrap(); - println!("discovery response: {:?}", resposne); - println!("discovery response: {:?}", resposne.text().await.unwrap()); + // todo verify discovery response contains service/handler let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); let ingress_host = format!("http://{}:{}", host, ingress_port); let ingress_handler_url = format!("{}/{}/{}", ingress_host, service_name, handler).to_string(); - println!("ingress url: {}", ingress_handler_url); - let ingress_response = client.post(ingress_handler_url) - .send().await; + let ingress_resopnse = client.post(ingress_handler_url) + .send().await?; - let resposne = ingress_response.unwrap(); - println!("ingress response: {:?}", resposne); - println!("ingress response: {:?}", resposne.text().await.unwrap()); + return Ok(ingress_resopnse); + } +} - return Ok(()); +impl Drop for TestContainer { + fn drop(&mut self) { + + // todo cleanup on drop? + // testcontainers-rs already implements stop/rm on drop] + // https://docs.rs/testcontainers/latest/testcontainers/ + // } } -#[tokio::test] -async fn boot_test_container() { - let _test_comtainer = crate::test_utils::TestContainer::new().await.unwrap(); -} +// #[tokio::test] +// async fn boot_test_container() { +// let _test_comtainer = crate::test_utils::TestContainer::new("docker.io/restatedev/restate".to_string(), "latest".to_string()).await.unwrap(); +// } diff --git a/tests/test_container.rs b/tests/test_container.rs index b42621b..2e919b5 100644 --- a/tests/test_container.rs +++ b/tests/test_container.rs @@ -1,5 +1,5 @@ use restate_test_utils::test_utils::TestContainer; -use restate_sdk::{discovery::Service, prelude::*}; +use restate_sdk::{discovery::{self, Service}, prelude::*}; // Should compile #[restate_sdk::service] @@ -34,32 +34,25 @@ impl MyService for MyServiceImpl { #[tokio::test] async fn test_container_image() { - let test_container = TestContainer::new().await.unwrap(); + let mut test_container = TestContainer::new("docker.io/restatedev/restate".to_string(), "latest".to_string()).await.unwrap(); - // uses higher port number to avoid collisions - // with non-test instances running locally - let host_port:u16 = 19080; - let host_address = format!("0.0.0.0:{}", host_port); + let endpoint = Endpoint::builder() + .bind(MyServiceImpl.serve()) + .build(); - println!("starting host"); - // boot restate server - tokio::spawn(async move { - HttpServer::new( - Endpoint::builder() - .bind(MyServiceImpl.serve()) - .build(), - ).listen_and_serve(host_address.parse().unwrap()).await; - }); + test_container.serve_endpoint(endpoint).await; + // optionally insert a delays via tokio sleep + TestContainer::delay(1000).await; + + // optionally call invoke on service handlers use restate_sdk::service::Discoverable; let my_service:Service = ServeMyService::::discover(); + let invoke_response = test_container.invoke(my_service, "my_handler".to_string()).await; - let registered = test_container.register(host_port).await; - - TestContainer::delay(1000).await; - - let response = test_container.invoke(my_service, "my_handler".to_string()).await; + assert!(invoke_response.is_ok()); - assert!(registered.is_ok()); + println!("\n\ningress response:"); + println!("{}\n\n", invoke_response.unwrap().text().await.unwrap()); } From 94519536a663dd0317742975aa9ca34aa6406842 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Tue, 11 Feb 2025 11:04:44 -0500 Subject: [PATCH 03/12] fix text example output --- tests/test_container.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_container.rs b/tests/test_container.rs index 2e919b5..ca2613c 100644 --- a/tests/test_container.rs +++ b/tests/test_container.rs @@ -52,7 +52,7 @@ async fn test_container_image() { assert!(invoke_response.is_ok()); - println!("\n\ningress response:"); - println!("{}\n\n", invoke_response.unwrap().text().await.unwrap()); + println!("invoke response:"); + println!("{}", invoke_response.unwrap().text().await.unwrap()); } From cffc3fa816fd5868c83382b79e5ade6db88f206f Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Tue, 11 Feb 2025 12:52:52 -0500 Subject: [PATCH 04/12] fix strings --- restate-test-utils/src/test_utils.rs | 16 ++++++++-------- tests/test_container.rs | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/restate-test-utils/src/test_utils.rs b/restate-test-utils/src/test_utils.rs index 7d5b010..78b6f28 100644 --- a/restate-test-utils/src/test_utils.rs +++ b/restate-test-utils/src/test_utils.rs @@ -42,7 +42,7 @@ pub struct TestContainer { impl TestContainer { //"docker.io/restatedev/restate", "latest" - pub async fn new(image:String, version:String) -> Result { + pub async fn new(image:&str, version:&str) -> Result { let image = GenericImage::new(image, version) .with_exposed_port(9070.tcp()) @@ -122,13 +122,13 @@ impl TestContainer { let client = reqwest::Client::builder().http2_prior_knowledge().build()?; // wait for server to respond - while let Err(_) = client.get(format!("{}/health", server_url).to_string()) + while let Err(_) = client.get(format!("{}/health", server_url)) .header("accept", "application/vnd.restate.endpointmanifest.v1+json") .send().await { tokio::time::sleep(Duration::from_secs(1)).await; } - client.get(format!("{}/health", server_url).to_string()) + client.get(format!("{}/health", server_url)) .header("accept", "application/vnd.restate.endpointmanifest.v1+json") .send().await?; @@ -142,7 +142,7 @@ impl TestContainer { let register_admin_url = format!("http://{}:{}/deployments", host, admin_port); - client.post(register_admin_url.to_string()) + client.post(register_admin_url) .json(&deployment_payload) .send().await?; @@ -158,7 +158,7 @@ impl TestContainer { tokio::time::sleep(Duration::from_millis(milliseconds)).await; } - pub async fn invoke(&self, service:Service, handler:String) -> Result { + pub async fn invoke(&self, service:Service, handler:&str) -> Result { let host = self.container.get_host().await?; let ports = self.container.ports().await?; @@ -168,14 +168,14 @@ impl TestContainer { let service_name:String = service.name.to_string(); let handler_names:Vec = service.handlers.iter().map(|h|h.name.to_string()).collect(); - assert!(handler_names.contains(&handler)); + assert!(handler_names.contains(&handler.to_string())); println!("\n\n{}\n\n", Style::new().bold().paint(format!("invoking {}/{}", service_name, handler))); let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); let admin_host = format!("http://{}:{}", host, admin_port); - let service_discovery_url = format!("{}/services/{}/handlers", admin_host, service_name).to_string(); + let service_discovery_url = format!("{}/services/{}/handlers", admin_host, service_name); client.get(service_discovery_url) .send().await?; @@ -185,7 +185,7 @@ impl TestContainer { let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); let ingress_host = format!("http://{}:{}", host, ingress_port); - let ingress_handler_url = format!("{}/{}/{}", ingress_host, service_name, handler).to_string(); + let ingress_handler_url = format!("{}/{}/{}", ingress_host, service_name, handler); let ingress_resopnse = client.post(ingress_handler_url) .send().await?; diff --git a/tests/test_container.rs b/tests/test_container.rs index ca2613c..4f3dc83 100644 --- a/tests/test_container.rs +++ b/tests/test_container.rs @@ -34,7 +34,7 @@ impl MyService for MyServiceImpl { #[tokio::test] async fn test_container_image() { - let mut test_container = TestContainer::new("docker.io/restatedev/restate".to_string(), "latest".to_string()).await.unwrap(); + let mut test_container = TestContainer::new("docker.io/restatedev/restate", "latest").await.unwrap(); let endpoint = Endpoint::builder() .bind(MyServiceImpl.serve()) @@ -48,7 +48,7 @@ async fn test_container_image() { // optionally call invoke on service handlers use restate_sdk::service::Discoverable; let my_service:Service = ServeMyService::::discover(); - let invoke_response = test_container.invoke(my_service, "my_handler".to_string()).await; + let invoke_response = test_container.invoke(my_service, "my_handler").await; assert!(invoke_response.is_ok()); From 475ae7d92743b6c955c924f839c675451ccd4c43 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Tue, 18 Feb 2025 22:33:30 -0500 Subject: [PATCH 05/12] update testcontainer api + move to test-env crate --- Cargo.toml | 3 +- restate-test-utils/src/lib.rs | 1 - restate-test-utils/src/test_utils.rs | 211 --------------- {restate-test-utils => test-env}/Cargo.toml | 12 +- test-env/src/lib.rs | 281 ++++++++++++++++++++ test-env/tests/test_container.rs | 72 +++++ tests/test_container.rs | 58 ---- 7 files changed, 360 insertions(+), 278 deletions(-) delete mode 100644 restate-test-utils/src/lib.rs delete mode 100644 restate-test-utils/src/test_utils.rs rename {restate-test-utils => test-env}/Cargo.toml (60%) create mode 100644 test-env/src/lib.rs create mode 100644 test-env/tests/test_container.rs delete mode 100644 tests/test_container.rs diff --git a/Cargo.toml b/Cargo.toml index 5385039..e01eb47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,6 @@ tracing-subscriber = "0.3" trybuild = "1.0" reqwest = { version = "0.12", features = ["json"] } rand = "0.8.5" -restate-test-utils = { path = "restate-test-utils" } [build-dependencies] jsonptr = "0.5.1" @@ -48,4 +47,4 @@ syn = "2.0" typify = { version = "0.1.0" } [workspace] -members = ["macros", "test-services", "restate-test-utils"] +members = ["macros", "test-services", "test-env"] diff --git a/restate-test-utils/src/lib.rs b/restate-test-utils/src/lib.rs deleted file mode 100644 index 681d26e..0000000 --- a/restate-test-utils/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod test_utils; diff --git a/restate-test-utils/src/test_utils.rs b/restate-test-utils/src/test_utils.rs deleted file mode 100644 index 78b6f28..0000000 --- a/restate-test-utils/src/test_utils.rs +++ /dev/null @@ -1,211 +0,0 @@ -use nu_ansi_term::Style; -use reqwest::Response; -use testcontainers::{core::{IntoContainerPort, WaitFor}, runners::AsyncRunner, ContainerAsync, ContainerRequest, GenericImage, ImageExt}; -use serde::{Serialize, Deserialize}; -use restate_sdk::{discovery::Service, errors::HandlerError, prelude::{Endpoint, HttpServer}}; -use tokio::{io::{self, AsyncWriteExt}, task::{self, JoinHandle}}; -use std::time::Duration; - -// addapted from from restate-admin-rest-model crate version 1.1.6 -#[derive(Serialize, Deserialize, Debug)] -pub struct RegisterDeploymentRequestHttp { - uri: String, - additional_headers:Option>, - use_http_11: bool, - force: bool, - dry_run: bool -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct RegisterDeploymentRequestLambda { - arn: String, - assume_role_arn: Option, - force: bool, - dry_run: bool, -} - -#[derive(Serialize, Deserialize, Debug)] -struct VersionResponse { - version:String, - min_admin_api_version:u32, - max_admin_api_version:u32 -} - -pub struct TestContainer { - container:ContainerAsync, - stdout_logging:JoinHandle<()>, - stderr_logging:JoinHandle<()>, - endpoint:Option> -} - - -impl TestContainer { - - //"docker.io/restatedev/restate", "latest" - pub async fn new(image:&str, version:&str) -> Result { - - let image = GenericImage::new(image, version) - .with_exposed_port(9070.tcp()) - .with_exposed_port(8080.tcp()) - .with_wait_for(WaitFor::message_on_stdout("Ingress HTTP listening")); - - // have to expose entire host network because testcontainer-rs doesn't implement selective SSH port forward from host - // see https://github.com/testcontainers/testcontainers-rs/issues/535 - let container = ContainerRequest::from(image) - .with_host("host.docker.internal" , testcontainers::core::Host::HostGateway) - .start() - .await?; - - let mut container_stdout = container.stdout(true); - // Spawn a task to copy data from the AsyncBufRead to stdout - let stdout_logging = task::spawn(async move { - let mut stdout = io::stdout(); - if let Err(e) = io::copy(&mut container_stdout, &mut stdout).await { - eprintln!("Error copying data: {}", e); - } - }); - - let mut container_stderr = container.stderr(true); - // Spawn a task to copy data from the AsyncBufRead to stderr - let stderr_logging = task::spawn(async move { - let mut stderr = io::stderr(); - if let Err(e) = io::copy(&mut container_stderr, &mut stderr).await { - eprintln!("Error copying data: {}", e); - } - }); - - let host = container.get_host().await?; - let ports = container.ports().await?; - - let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); - - let admin_url = format!("http://{}:{}/version", host, admin_port); - reqwest::get(admin_url) - .await? - .json::() - .await?; - - Ok(TestContainer {container, stdout_logging, stderr_logging, endpoint:None}) - } - - pub async fn serve_endpoint(&mut self, endpoint:Endpoint) { - - println!("\n\n{}\n\n", Style::new().bold().paint(format!("starting enpoint server..."))); - // uses higher port number to avoid collisions - // with non-test instances running locally - let host_port:u16 = 19080; - let host_address = format!("0.0.0.0:{}", host_port); - - // boot restate server - let endpoint = tokio::spawn(async move { - HttpServer::new(endpoint) - .listen_and_serve(host_address.parse().unwrap()).await; - }); - - let registered = self.register(host_port).await; - - assert!(registered.is_ok()); - - self.endpoint = Some(endpoint); - } - - async fn register(&self, server_port:u16) -> Result<(), HandlerError> { - - println!("\n\n{}\n\n", Style::new().bold().paint(format!("registering server..."))); - - let host = self.container.get_host().await?; - let ports = self.container.ports().await?; - - let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); - let server_url = format!("http://localhost:{}", admin_port); - - let client = reqwest::Client::builder().http2_prior_knowledge().build()?; - - // wait for server to respond - while let Err(_) = client.get(format!("{}/health", server_url)) - .header("accept", "application/vnd.restate.endpointmanifest.v1+json") - .send().await { - tokio::time::sleep(Duration::from_secs(1)).await; - } - - client.get(format!("{}/health", server_url)) - .header("accept", "application/vnd.restate.endpointmanifest.v1+json") - .send().await?; - - let deployment_uri:String = format!("http://host.docker.internal:{}/", server_port); - let deployment_payload = RegisterDeploymentRequestHttp { - uri:deployment_uri, - additional_headers:None, - use_http_11: false, - force: false, - dry_run: false }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } - - let register_admin_url = format!("http://{}:{}/deployments", host, admin_port); - - client.post(register_admin_url) - .json(&deployment_payload) - .send().await?; - - let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); - let ingress_host = format!("http://{}:{}", host, ingress_port); - - println!("\n\n{}\n\n", Style::new().bold().paint(format!("ingress url: {}", ingress_host, ))); - - return Ok(()); - } - - pub async fn delay(milliseconds:u64) { - tokio::time::sleep(Duration::from_millis(milliseconds)).await; - } - - pub async fn invoke(&self, service:Service, handler:&str) -> Result { - - let host = self.container.get_host().await?; - let ports = self.container.ports().await?; - - let client = reqwest::Client::builder().http2_prior_knowledge().build().unwrap(); - - let service_name:String = service.name.to_string(); - let handler_names:Vec = service.handlers.iter().map(|h|h.name.to_string()).collect(); - - assert!(handler_names.contains(&handler.to_string())); - - println!("\n\n{}\n\n", Style::new().bold().paint(format!("invoking {}/{}", service_name, handler))); - - let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); - let admin_host = format!("http://{}:{}", host, admin_port); - - let service_discovery_url = format!("{}/services/{}/handlers", admin_host, service_name); - - client.get(service_discovery_url) - .send().await?; - - // todo verify discovery response contains service/handler - - let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); - let ingress_host = format!("http://{}:{}", host, ingress_port); - - let ingress_handler_url = format!("{}/{}/{}", ingress_host, service_name, handler); - - let ingress_resopnse = client.post(ingress_handler_url) - .send().await?; - - return Ok(ingress_resopnse); - } -} - -impl Drop for TestContainer { - fn drop(&mut self) { - - // todo cleanup on drop? - // testcontainers-rs already implements stop/rm on drop] - // https://docs.rs/testcontainers/latest/testcontainers/ - // - - } -} - -// #[tokio::test] -// async fn boot_test_container() { -// let _test_comtainer = crate::test_utils::TestContainer::new("docker.io/restatedev/restate".to_string(), "latest".to_string()).await.unwrap(); -// } diff --git a/restate-test-utils/Cargo.toml b/test-env/Cargo.toml similarity index 60% rename from restate-test-utils/Cargo.toml rename to test-env/Cargo.toml index bef53bb..98fc093 100644 --- a/restate-test-utils/Cargo.toml +++ b/test-env/Cargo.toml @@ -1,21 +1,21 @@ [package] -name = "restate-test-utils" +name = "test-env" version = "0.3.2" edition = "2021" description = "Test Utilities for Restate SDK for Rust" license = "MIT" -repository = "https://github.com/restatedev/sdk-rust/test-utils" +repository = "https://github.com/restatedev/sdk-rust" rust-version = "1.76.0" [dependencies] -futures = "0.3.31" -http = "1.2.0" +anyhow = "1.0.95" nu-ansi-term = "0.50.1" reqwest = {version= "0.12.12", features = ["json"]} -restate-sdk = { version = "0.3.2", path = ".." } -restate-sdk-shared-core = "0.2.0" +restate-sdk = {path = "../"} serde = "1.0.217" serde_json = "1.0.138" testcontainers = "0.23.1" tokio = "1.43.0" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" diff --git a/test-env/src/lib.rs b/test-env/src/lib.rs new file mode 100644 index 0000000..90d98ed --- /dev/null +++ b/test-env/src/lib.rs @@ -0,0 +1,281 @@ +use tracing::{error, info, warn}; +use testcontainers::{core::{IntoContainerPort, WaitFor}, runners::AsyncRunner, ContainerAsync, ContainerRequest, GenericImage, ImageExt}; +use serde::{Serialize, Deserialize}; +use restate_sdk::{errors::HandlerError, prelude::{Endpoint, HttpServer}}; +use tokio::{io::AsyncBufReadExt, net::TcpListener, task::{self, JoinHandle}}; +use std::time::Duration; + +// addapted from from restate-admin-rest-model crate version 1.1.6 +#[derive(Serialize, Deserialize, Debug)] +pub struct RegisterDeploymentRequestHttp { + uri: String, + additional_headers:Option>, + use_http_11: bool, + force: bool, + dry_run: bool +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct RegisterDeploymentRequestLambda { + arn: String, + assume_role_arn: Option, + force: bool, + dry_run: bool, +} + +#[derive(Serialize, Deserialize, Debug)] +struct VersionResponse { + version:String, + min_admin_api_version:u32, + max_admin_api_version:u32 +} + +pub struct TestContainerBuilder { + container_name:String, + container_tag:String, + logging:bool +} + +impl TestContainerBuilder { + + pub fn new() -> TestContainerBuilder { + + TestContainerBuilder { container_name: "docker.io/restatedev/restate".to_string(), + container_tag: "latest".to_string(), + logging: false } + + } + + pub fn with_container_logging(mut self) -> TestContainerBuilder { + self.logging = true; + self + } + + pub fn with_container(mut self, container_name:String, container_tag:String) -> TestContainerBuilder { + self.container_name = container_name; + self.container_tag = container_tag; + + self + } + + pub fn build(self) -> TestContainer { + TestContainer { builder: self, + container: None, + endpoint_task: None, + endpoint_server_ip: None, + endpoint_server_port: None, + endpoint_server_url: None, + ingress_url: None, + stdout_logging: None, + stderr_logging: None } + } +} + +pub struct TestContainer { + builder:TestContainerBuilder, + container:Option>, + endpoint_task:Option>, + endpoint_server_ip:Option, + endpoint_server_port:Option, + endpoint_server_url:Option, + ingress_url:Option, + stdout_logging:Option>, + stderr_logging:Option>, +} + +impl TestContainer { + + pub fn new() -> TestContainer { + TestContainerBuilder::new().build() + } + + pub fn builder() -> TestContainerBuilder { + + TestContainerBuilder::new() + + } + + pub async fn start(mut self, endpoint:Endpoint) -> Result { + + self.serve_endpoint(endpoint).await?; + self.start_container().await?; + let registered = self.register_endpoint().await; + if registered.is_err() { + return Err(anyhow::anyhow!("Failed to register endpoint")); + } + + Ok(self) + } + + async fn serve_endpoint(&mut self, endpoint:Endpoint) -> Result<(), anyhow::Error> { + + info!("Starting endpoint server..."); + + // use port 0 to allow tokio to assign unused port number + let host_address = format!("127.0.0.1:0"); + let listener = TcpListener::bind(host_address).await.expect("listener can bind"); + self.endpoint_server_ip = Some(listener.local_addr().unwrap().ip().to_string()); + self.endpoint_server_port = Some(listener.local_addr().unwrap().port()); + self.endpoint_server_url = Some(format!("http://{}:{}", self.endpoint_server_ip.as_ref().unwrap(), self.endpoint_server_port.as_ref().unwrap())); + + // boot restate server + self.endpoint_task = Some(tokio::spawn(async move { + HttpServer::new(endpoint) + //.serve(listener).await; + .serve(listener).await; + })); + + let client = reqwest::Client::builder().http2_prior_knowledge().build()?; + + // wait for server to respond + while let Err(_) = client.get(format!("{}/discover", self.endpoint_server_url.as_ref().unwrap())) + .header("accept", "application/vnd.restate.endpointmanifest.v1+json") + .send().await { + tokio::time::sleep(Duration::from_secs(1)).await; + + warn!("retrying endpoint server"); + } + + client.get(format!("{}/discover", self.endpoint_server_url.as_ref().unwrap())) + .header("accept", "application/vnd.restate.endpointmanifest.v1+json") + .send().await?; + + info!("endpoint server: {}", self.endpoint_server_url.as_ref().unwrap()); + + Ok(()) + } + + async fn start_container(&mut self) -> Result<(), anyhow::Error> { + + let image = GenericImage::new(&self.builder.container_name, &self.builder.container_tag) + .with_exposed_port(9070.tcp()) + .with_exposed_port(8080.tcp()) + .with_wait_for(WaitFor::message_on_stdout("Ingress HTTP listening")); + + // have to expose entire host network because testcontainer-rs doesn't implement selective SSH port forward from host + // see https://github.com/testcontainers/testcontainers-rs/issues/535 + self.container = Some(ContainerRequest::from(image) + .with_host("host.docker.internal" , testcontainers::core::Host::HostGateway) + .start() + .await?); + + if self.builder.logging { + + let container_stdout = self.container.as_ref().unwrap().stdout(true); + let mut stdout_lines = container_stdout.lines(); + + // Spawn a task to copy data from the AsyncBufRead to stdout + let stdout_logging = task::spawn(async move { + while let Some(line) = stdout_lines.next_line().await.transpose() { + match line { + Ok(line) => { + // Log each line using tracing + info!("{}", line); + } + Err(e) => { + // Log any errors + error!("Error reading from container stream: {}", e); + break; + } + } + } + }); + + self.stderr_logging = Some(stdout_logging); + + let container_stderr = self.container.as_ref().unwrap().stderr(true); + let mut stderr_lines = container_stderr.lines(); + + // Spawn a task to copy data from the AsyncBufRead to stderr + let stderr_logging = task::spawn(async move { + while let Some(line) = stderr_lines.next_line().await.transpose() { + match line { + Ok(line) => { + // Log each line using tracing + error!("{}", line); + } + Err(e) => { + // Log any errors + error!("Error reading from container stream: {}", e); + break; + } + } + } + }); + + self.stderr_logging = Some(stderr_logging); + } + + + let host = self.container.as_ref().unwrap().get_host().await?; + let ports = self.container.as_ref().unwrap().ports().await?; + + let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); + + let admin_url = format!("http://{}:{}/version", host, admin_port); + reqwest::get(admin_url) + .await? + .json::() + .await?; + + Ok(()) + } + + async fn register_endpoint(&mut self) -> Result<(), HandlerError> { + + info!("registering endpoint server: {}", self.endpoint_server_url.as_ref().unwrap()); + + let host = self.container.as_ref().unwrap().get_host().await?; + let ports = self.container.as_ref().unwrap().ports().await?; + + let admin_port = ports.map_to_host_port_ipv4(9070.tcp()).unwrap(); + + let client = reqwest::Client::builder().http2_prior_knowledge().build()?; + + let deployment_uri:String = format!("http://host.docker.internal:{}/", self.endpoint_server_port.unwrap()); + let deployment_payload = RegisterDeploymentRequestHttp { + uri:deployment_uri, + additional_headers:None, + use_http_11: false, + force: false, + dry_run: false }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } + + let register_admin_url = format!("http://{}:{}/deployments", host, admin_port); + + client.post(register_admin_url) + .json(&deployment_payload) + .send().await?; + + let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); + self.ingress_url = Some(format!("http://{}:{}", host, ingress_port)); + + info!("ingress url: {}", self.ingress_url.as_ref().unwrap()); + + return Ok(()); + } + + pub fn ingress_url(&self) -> String { + self.ingress_url.clone().unwrap() + } +} + +impl Drop for TestContainer { + fn drop(&mut self) { + + // testcontainers-rs already implements stop/rm on drop] + // https://docs.rs/testcontainers/latest/testcontainers/ + + // clean up tokio tasks + if self.endpoint_task.is_some() { + self.endpoint_task.take().unwrap().abort(); + } + + if self.stdout_logging.is_some() { + self.stdout_logging.take().unwrap().abort(); + } + + if self.stderr_logging.is_some() { + self.stderr_logging.take().unwrap().abort(); + } + } +} diff --git a/test-env/tests/test_container.rs b/test-env/tests/test_container.rs new file mode 100644 index 0000000..e22e9eb --- /dev/null +++ b/test-env/tests/test_container.rs @@ -0,0 +1,72 @@ +use reqwest::StatusCode; +use restate_sdk::prelude::*; +use test_env::TestContainer; +use tracing::info; + +// Should compile +#[restate_sdk::service] +trait MyService { + async fn my_handler() -> HandlerResult; +} + +#[restate_sdk::object] +trait MyObject { + async fn my_handler(input: String) -> HandlerResult; + #[shared] + async fn my_shared_handler(input: String) -> HandlerResult; +} + +#[restate_sdk::workflow] +trait MyWorkflow { + async fn my_handler(input: String) -> HandlerResult; + #[shared] + async fn my_shared_handler(input: String) -> HandlerResult; +} + + +struct MyServiceImpl; + +impl MyService for MyServiceImpl { + async fn my_handler(&self, _: Context<'_>) -> HandlerResult { + let result = "hello!"; + Ok(result.to_string()) + } +} + +#[tokio::test] +async fn test_container() { + + tracing_subscriber::fmt::fmt() + .with_max_level(tracing::Level::INFO) // Set the maximum log level + .init(); + + let endpoint = Endpoint::builder() + .bind(MyServiceImpl.serve()) + .build(); + + // simple test container intialization with default configuration + // let mut test_container = TestContainer::new().start(endpoint).await.unwrap(); + + // custom test container initialization with builder + let test_container = TestContainer::builder() + // optional passthrough logging from the resstate server testcontainer + // prints container logs to tracing::info level + .with_container_logging() + .with_container("docker.io/restatedev/restate".to_string(), "latest".to_string()) + .build() + .start(endpoint).await.unwrap(); + + let ingress_url = test_container.ingress_url(); + + // call container ingress url for /MyService/my_handler + let resposne = reqwest::Client::new().post(format!("{}/MyService/my_handler", ingress_url)) + .header("Accept", "application/json") + .header("Content-Type", "*/*") + .header("idempotency-key", "abc") + .send().await.unwrap(); + + assert_eq!(resposne.status(), StatusCode::OK); + + info!("/MyService/my_handler response: {:?}", resposne.text().await.unwrap()); + +} diff --git a/tests/test_container.rs b/tests/test_container.rs deleted file mode 100644 index 4f3dc83..0000000 --- a/tests/test_container.rs +++ /dev/null @@ -1,58 +0,0 @@ -use restate_test_utils::test_utils::TestContainer; -use restate_sdk::{discovery::{self, Service}, prelude::*}; - -// Should compile -#[restate_sdk::service] -trait MyService { - async fn my_handler() -> HandlerResult; -} - -#[restate_sdk::object] -trait MyObject { - async fn my_handler(input: String) -> HandlerResult; - #[shared] - async fn my_shared_handler(input: String) -> HandlerResult; -} - -#[restate_sdk::workflow] -trait MyWorkflow { - async fn my_handler(input: String) -> HandlerResult; - #[shared] - async fn my_shared_handler(input: String) -> HandlerResult; -} - - -struct MyServiceImpl; - -impl MyService for MyServiceImpl { - async fn my_handler(&self, _: Context<'_>) -> HandlerResult { - let result = "hello!"; - Ok(result.to_string()) - } -} - -#[tokio::test] -async fn test_container_image() { - - let mut test_container = TestContainer::new("docker.io/restatedev/restate", "latest").await.unwrap(); - - let endpoint = Endpoint::builder() - .bind(MyServiceImpl.serve()) - .build(); - - test_container.serve_endpoint(endpoint).await; - - // optionally insert a delays via tokio sleep - TestContainer::delay(1000).await; - - // optionally call invoke on service handlers - use restate_sdk::service::Discoverable; - let my_service:Service = ServeMyService::::discover(); - let invoke_response = test_container.invoke(my_service, "my_handler").await; - - assert!(invoke_response.is_ok()); - - println!("invoke response:"); - println!("{}", invoke_response.unwrap().text().await.unwrap()); - -} From 44a4255d71a3ff0a2fee60d6cf806fb77c9b5920 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Wed, 19 Feb 2025 08:58:11 -0500 Subject: [PATCH 06/12] cleanup / lint + fmt fixes --- test-env/src/lib.rs | 212 ++++++++++++++++++------------- test-env/tests/test_container.rs | 52 ++++---- 2 files changed, 155 insertions(+), 109 deletions(-) diff --git a/test-env/src/lib.rs b/test-env/src/lib.rs index 90d98ed..c3488d3 100644 --- a/test-env/src/lib.rs +++ b/test-env/src/lib.rs @@ -1,18 +1,29 @@ -use tracing::{error, info, warn}; -use testcontainers::{core::{IntoContainerPort, WaitFor}, runners::AsyncRunner, ContainerAsync, ContainerRequest, GenericImage, ImageExt}; -use serde::{Serialize, Deserialize}; -use restate_sdk::{errors::HandlerError, prelude::{Endpoint, HttpServer}}; -use tokio::{io::AsyncBufReadExt, net::TcpListener, task::{self, JoinHandle}}; +use restate_sdk::{ + errors::HandlerError, + prelude::{Endpoint, HttpServer}, +}; +use serde::{Deserialize, Serialize}; use std::time::Duration; +use testcontainers::{ + core::{IntoContainerPort, WaitFor}, + runners::AsyncRunner, + ContainerAsync, ContainerRequest, GenericImage, ImageExt, +}; +use tokio::{ + io::AsyncBufReadExt, + net::TcpListener, + task::{self, JoinHandle}, +}; +use tracing::{error, info, warn}; // addapted from from restate-admin-rest-model crate version 1.1.6 #[derive(Serialize, Deserialize, Debug)] pub struct RegisterDeploymentRequestHttp { uri: String, - additional_headers:Option>, + additional_headers: Option>, use_http_11: bool, force: bool, - dry_run: bool + dry_run: bool, } #[derive(Serialize, Deserialize, Debug)] @@ -25,33 +36,38 @@ pub struct RegisterDeploymentRequestLambda { #[derive(Serialize, Deserialize, Debug)] struct VersionResponse { - version:String, - min_admin_api_version:u32, - max_admin_api_version:u32 + version: String, + min_admin_api_version: u32, + max_admin_api_version: u32, } pub struct TestContainerBuilder { - container_name:String, - container_tag:String, - logging:bool + container_name: String, + container_tag: String, + logging: bool, } -impl TestContainerBuilder { - - pub fn new() -> TestContainerBuilder { - - TestContainerBuilder { container_name: "docker.io/restatedev/restate".to_string(), - container_tag: "latest".to_string(), - logging: false } - +impl Default for TestContainerBuilder { + fn default() -> Self { + TestContainerBuilder { + container_name: "docker.io/restatedev/restate".to_string(), + container_tag: "latest".to_string(), + logging: false, + } } +} +impl TestContainerBuilder { pub fn with_container_logging(mut self) -> TestContainerBuilder { self.logging = true; self } - pub fn with_container(mut self, container_name:String, container_tag:String) -> TestContainerBuilder { + pub fn with_container( + mut self, + container_name: String, + container_tag: String, + ) -> TestContainerBuilder { self.container_name = container_name; self.container_tag = container_tag; @@ -59,44 +75,44 @@ impl TestContainerBuilder { } pub fn build(self) -> TestContainer { - TestContainer { builder: self, - container: None, - endpoint_task: None, - endpoint_server_ip: None, - endpoint_server_port: None, - endpoint_server_url: None, - ingress_url: None, - stdout_logging: None, - stderr_logging: None } + TestContainer { + builder: self, + container: None, + endpoint_task: None, + endpoint_server_ip: None, + endpoint_server_port: None, + endpoint_server_url: None, + ingress_url: None, + stdout_logging: None, + stderr_logging: None, + } } } pub struct TestContainer { - builder:TestContainerBuilder, - container:Option>, - endpoint_task:Option>, - endpoint_server_ip:Option, - endpoint_server_port:Option, - endpoint_server_url:Option, - ingress_url:Option, - stdout_logging:Option>, - stderr_logging:Option>, + builder: TestContainerBuilder, + container: Option>, + endpoint_task: Option>, + endpoint_server_ip: Option, + endpoint_server_port: Option, + endpoint_server_url: Option, + ingress_url: Option, + stdout_logging: Option>, + stderr_logging: Option>, } -impl TestContainer { - - pub fn new() -> TestContainer { - TestContainerBuilder::new().build() +impl Default for TestContainer { + fn default() -> Self { + TestContainerBuilder::default().build() } +} +impl TestContainer { pub fn builder() -> TestContainerBuilder { - - TestContainerBuilder::new() - + TestContainerBuilder::default() } - pub async fn start(mut self, endpoint:Endpoint) -> Result { - + pub async fn start(mut self, endpoint: Endpoint) -> Result { self.serve_endpoint(endpoint).await?; self.start_container().await?; let registered = self.register_endpoint().await; @@ -107,46 +123,60 @@ impl TestContainer { Ok(self) } - async fn serve_endpoint(&mut self, endpoint:Endpoint) -> Result<(), anyhow::Error> { - + async fn serve_endpoint(&mut self, endpoint: Endpoint) -> Result<(), anyhow::Error> { info!("Starting endpoint server..."); // use port 0 to allow tokio to assign unused port number - let host_address = format!("127.0.0.1:0"); - let listener = TcpListener::bind(host_address).await.expect("listener can bind"); - self.endpoint_server_ip = Some(listener.local_addr().unwrap().ip().to_string()); + let host_address = "127.0.0.1:0".to_string(); + let listener = TcpListener::bind(host_address) + .await + .expect("listener can bind"); + self.endpoint_server_ip = Some(listener.local_addr().unwrap().ip().to_string()); self.endpoint_server_port = Some(listener.local_addr().unwrap().port()); - self.endpoint_server_url = Some(format!("http://{}:{}", self.endpoint_server_ip.as_ref().unwrap(), self.endpoint_server_port.as_ref().unwrap())); + self.endpoint_server_url = Some(format!( + "http://{}:{}", + self.endpoint_server_ip.as_ref().unwrap(), + self.endpoint_server_port.as_ref().unwrap() + )); // boot restate server self.endpoint_task = Some(tokio::spawn(async move { - HttpServer::new(endpoint) - //.serve(listener).await; - .serve(listener).await; + HttpServer::new(endpoint).serve(listener).await; })); let client = reqwest::Client::builder().http2_prior_knowledge().build()?; // wait for server to respond - while let Err(_) = client.get(format!("{}/discover", self.endpoint_server_url.as_ref().unwrap())) - .header("accept", "application/vnd.restate.endpointmanifest.v1+json") - .send().await { - tokio::time::sleep(Duration::from_secs(1)).await; + let mut retries = 0; + while client + .get(format!( + "{}/discover", + self.endpoint_server_url.as_ref().unwrap() + )) + .header("accept", "application/vnd.restate.endpointmanifest.v1+json") + .send() + .await + .is_err() + { + tokio::time::sleep(Duration::from_millis(100)).await; warn!("retrying endpoint server"); - } - client.get(format!("{}/discover", self.endpoint_server_url.as_ref().unwrap())) - .header("accept", "application/vnd.restate.endpointmanifest.v1+json") - .send().await?; + retries += 1; + if retries > 10 { + return Err(anyhow::anyhow!("endpoint server failed to start")); + } + } - info!("endpoint server: {}", self.endpoint_server_url.as_ref().unwrap()); + info!( + "endpoint server: {}", + self.endpoint_server_url.as_ref().unwrap() + ); Ok(()) } async fn start_container(&mut self) -> Result<(), anyhow::Error> { - let image = GenericImage::new(&self.builder.container_name, &self.builder.container_tag) .with_exposed_port(9070.tcp()) .with_exposed_port(8080.tcp()) @@ -154,13 +184,17 @@ impl TestContainer { // have to expose entire host network because testcontainer-rs doesn't implement selective SSH port forward from host // see https://github.com/testcontainers/testcontainers-rs/issues/535 - self.container = Some(ContainerRequest::from(image) - .with_host("host.docker.internal" , testcontainers::core::Host::HostGateway) - .start() - .await?); + self.container = Some( + ContainerRequest::from(image) + .with_host( + "host.docker.internal", + testcontainers::core::Host::HostGateway, + ) + .start() + .await?, + ); if self.builder.logging { - let container_stdout = self.container.as_ref().unwrap().stdout(true); let mut stdout_lines = container_stdout.lines(); @@ -206,7 +240,6 @@ impl TestContainer { self.stderr_logging = Some(stderr_logging); } - let host = self.container.as_ref().unwrap().get_host().await?; let ports = self.container.as_ref().unwrap().ports().await?; @@ -222,8 +255,10 @@ impl TestContainer { } async fn register_endpoint(&mut self) -> Result<(), HandlerError> { - - info!("registering endpoint server: {}", self.endpoint_server_url.as_ref().unwrap()); + info!( + "registering endpoint server: {}", + self.endpoint_server_url.as_ref().unwrap() + ); let host = self.container.as_ref().unwrap().get_host().await?; let ports = self.container.as_ref().unwrap().ports().await?; @@ -232,26 +267,32 @@ impl TestContainer { let client = reqwest::Client::builder().http2_prior_knowledge().build()?; - let deployment_uri:String = format!("http://host.docker.internal:{}/", self.endpoint_server_port.unwrap()); + let deployment_uri: String = format!( + "http://host.docker.internal:{}/", + self.endpoint_server_port.unwrap() + ); let deployment_payload = RegisterDeploymentRequestHttp { - uri:deployment_uri, - additional_headers:None, - use_http_11: false, - force: false, - dry_run: false }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } + uri: deployment_uri, + additional_headers: None, + use_http_11: false, + force: false, + dry_run: false, + }; //, additional_headers: (), use_http_11: (), force: (), dry_run: () } let register_admin_url = format!("http://{}:{}/deployments", host, admin_port); - client.post(register_admin_url) + client + .post(register_admin_url) .json(&deployment_payload) - .send().await?; + .send() + .await?; let ingress_port = ports.map_to_host_port_ipv4(8080.tcp()).unwrap(); self.ingress_url = Some(format!("http://{}:{}", host, ingress_port)); info!("ingress url: {}", self.ingress_url.as_ref().unwrap()); - return Ok(()); + Ok(()) } pub fn ingress_url(&self) -> String { @@ -261,7 +302,6 @@ impl TestContainer { impl Drop for TestContainer { fn drop(&mut self) { - // testcontainers-rs already implements stop/rm on drop] // https://docs.rs/testcontainers/latest/testcontainers/ diff --git a/test-env/tests/test_container.rs b/test-env/tests/test_container.rs index e22e9eb..04e15f2 100644 --- a/test-env/tests/test_container.rs +++ b/test-env/tests/test_container.rs @@ -23,7 +23,6 @@ trait MyWorkflow { async fn my_shared_handler(input: String) -> HandlerResult; } - struct MyServiceImpl; impl MyService for MyServiceImpl { @@ -35,38 +34,45 @@ impl MyService for MyServiceImpl { #[tokio::test] async fn test_container() { - tracing_subscriber::fmt::fmt() - .with_max_level(tracing::Level::INFO) // Set the maximum log level - .init(); + .with_max_level(tracing::Level::INFO) // Set the maximum log level + .init(); - let endpoint = Endpoint::builder() - .bind(MyServiceImpl.serve()) - .build(); + let endpoint = Endpoint::builder().bind(MyServiceImpl.serve()).build(); // simple test container intialization with default configuration // let mut test_container = TestContainer::new().start(endpoint).await.unwrap(); // custom test container initialization with builder let test_container = TestContainer::builder() - // optional passthrough logging from the resstate server testcontainer - // prints container logs to tracing::info level - .with_container_logging() - .with_container("docker.io/restatedev/restate".to_string(), "latest".to_string()) - .build() - .start(endpoint).await.unwrap(); + // optional passthrough logging from the resstate server testcontainer + // prints container logs to tracing::info level + .with_container_logging() + .with_container( + "docker.io/restatedev/restate".to_string(), + "latest".to_string(), + ) + .build() + .start(endpoint) + .await + .unwrap(); let ingress_url = test_container.ingress_url(); // call container ingress url for /MyService/my_handler - let resposne = reqwest::Client::new().post(format!("{}/MyService/my_handler", ingress_url)) - .header("Accept", "application/json") - .header("Content-Type", "*/*") - .header("idempotency-key", "abc") - .send().await.unwrap(); - - assert_eq!(resposne.status(), StatusCode::OK); - - info!("/MyService/my_handler response: {:?}", resposne.text().await.unwrap()); - + let response = reqwest::Client::new() + .post(format!("{}/MyService/my_handler", ingress_url)) + .header("Accept", "application/json") + .header("Content-Type", "*/*") + .header("idempotency-key", "abc") + .send() + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + info!( + "/MyService/my_handler response: {:?}", + response.text().await.unwrap() + ); } From 4c46cdc9d36e78180222d62a6d788c8ef3b84c9f Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Wed, 19 Feb 2025 09:03:19 -0500 Subject: [PATCH 07/12] update comment in test example --- test-env/tests/test_container.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-env/tests/test_container.rs b/test-env/tests/test_container.rs index 04e15f2..f5e7792 100644 --- a/test-env/tests/test_container.rs +++ b/test-env/tests/test_container.rs @@ -41,7 +41,7 @@ async fn test_container() { let endpoint = Endpoint::builder().bind(MyServiceImpl.serve()).build(); // simple test container intialization with default configuration - // let mut test_container = TestContainer::new().start(endpoint).await.unwrap(); + // let mut test_container = TestContainer::default().start(endpoint).await.unwrap(); // custom test container initialization with builder let test_container = TestContainer::builder() From bf7f48d80405c8ffc4620401042be044055553bd Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Wed, 19 Feb 2025 09:12:19 -0500 Subject: [PATCH 08/12] fix comment --- test-env/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-env/src/lib.rs b/test-env/src/lib.rs index c3488d3..219574a 100644 --- a/test-env/src/lib.rs +++ b/test-env/src/lib.rs @@ -139,7 +139,7 @@ impl TestContainer { self.endpoint_server_port.as_ref().unwrap() )); - // boot restate server + // boot enpoint server self.endpoint_task = Some(tokio::spawn(async move { HttpServer::new(endpoint).serve(listener).await; })); From 24129fa5d21da6d8a9a2929f808f605af4d99f6c Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Wed, 19 Feb 2025 09:21:31 -0500 Subject: [PATCH 09/12] more comment fixes --- test-env/src/lib.rs | 4 ++-- test-env/tests/test_container.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test-env/src/lib.rs b/test-env/src/lib.rs index 219574a..ea10e77 100644 --- a/test-env/src/lib.rs +++ b/test-env/src/lib.rs @@ -139,14 +139,14 @@ impl TestContainer { self.endpoint_server_port.as_ref().unwrap() )); - // boot enpoint server + // boot endpoint server self.endpoint_task = Some(tokio::spawn(async move { HttpServer::new(endpoint).serve(listener).await; })); let client = reqwest::Client::builder().http2_prior_knowledge().build()?; - // wait for server to respond + // wait for endpoint server to respond let mut retries = 0; while client .get(format!( diff --git a/test-env/tests/test_container.rs b/test-env/tests/test_container.rs index f5e7792..179882f 100644 --- a/test-env/tests/test_container.rs +++ b/test-env/tests/test_container.rs @@ -41,7 +41,7 @@ async fn test_container() { let endpoint = Endpoint::builder().bind(MyServiceImpl.serve()).build(); // simple test container intialization with default configuration - // let mut test_container = TestContainer::default().start(endpoint).await.unwrap(); + //let test_container = TestContainer::default().start(endpoint).await.unwrap(); // custom test container initialization with builder let test_container = TestContainer::builder() From 88ef9a4a0a1473d543ea6f112cf504d9eb8c4200 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Wed, 19 Feb 2025 09:46:57 -0500 Subject: [PATCH 10/12] add testing to README --- README.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/README.md b/README.md index b8ea6b1..d0c4f39 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,60 @@ async fn main() { The SDK uses tokio's [`tracing`](https://docs.rs/tracing/latest/tracing/) crate to generate logs. Just configure it as usual through [`tracing_subscriber`](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/) to get your logs. +### Testing + +The SDK uses [testcontainers](https://rust.testcontainers.org/) to support integration testing using docker deployed restate server. +The `test-env` crate provies a framwork for intializing the test environment, and an integration test example in `test-env/tests/test_container.rs`. + +```rust + +#[tokio::test] +async fn test_container() { + tracing_subscriber::fmt::fmt() + .with_max_level(tracing::Level::INFO) // Set the maximum log level + .init(); + + let endpoint = Endpoint::builder().bind(MyServiceImpl.serve()).build(); + + // simple test container intialization with default configuration + //let test_container = TestContainer::default().start(endpoint).await.unwrap(); + + // custom test container initialization with builder + let test_container = TestContainer::builder() + // optional passthrough logging from the resstate server testcontainer + // prints container logs to tracing::info level + .with_container_logging() + .with_container( + "docker.io/restatedev/restate".to_string(), + "latest".to_string(), + ) + .build() + .start(endpoint) + .await + .unwrap(); + + let ingress_url = test_container.ingress_url(); + + // call container ingress url for /MyService/my_handler + let response = reqwest::Client::new() + .post(format!("{}/MyService/my_handler", ingress_url)) + .header("Accept", "application/json") + .header("Content-Type", "*/*") + .header("idempotency-key", "abc") + .send() + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + info!( + "/MyService/my_handler response: {:?}", + response.text().await.unwrap() + ); +} + +``` + ## Versions The Rust SDK is currently in active development, and might break across releases. From af3613a3f6868c047a5eeff24edbea98f6c43b20 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Wed, 19 Feb 2025 09:49:15 -0500 Subject: [PATCH 11/12] readme formatting --- README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d0c4f39..548d6f7 100644 --- a/README.md +++ b/README.md @@ -65,11 +65,10 @@ Just configure it as usual through [`tracing_subscriber`](https://docs.rs/tracin ### Testing -The SDK uses [testcontainers](https://rust.testcontainers.org/) to support integration testing using docker deployed restate server. -The `test-env` crate provies a framwork for intializing the test environment, and an integration test example in `test-env/tests/test_container.rs`. +The SDK uses [Testcontainers](https://rust.testcontainers.org/) to support integration testing using a Docker-deployed restate server. +The `test-env` crate provides a framework for initializing the test environment, and an integration test example in `test-env/tests/test_container.rs`. ```rust - #[tokio::test] async fn test_container() { tracing_subscriber::fmt::fmt() @@ -114,7 +113,6 @@ async fn test_container() { response.text().await.unwrap() ); } - ``` ## Versions From a4856c7161dd9e668cb7d1da6ab2f11cea43a9d1 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Wed, 19 Feb 2025 15:29:09 -0500 Subject: [PATCH 12/12] fix crate name --- README.md | 2 +- test-env/Cargo.toml | 2 +- test-env/tests/test_container.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 548d6f7..ea39d16 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ Just configure it as usual through [`tracing_subscriber`](https://docs.rs/tracin ### Testing The SDK uses [Testcontainers](https://rust.testcontainers.org/) to support integration testing using a Docker-deployed restate server. -The `test-env` crate provides a framework for initializing the test environment, and an integration test example in `test-env/tests/test_container.rs`. +The `restate-sdk-test-env` crate provides a framework for initializing the test environment, and an integration test example in `test-env/tests/test_container.rs`. ```rust #[tokio::test] diff --git a/test-env/Cargo.toml b/test-env/Cargo.toml index 98fc093..c15751b 100644 --- a/test-env/Cargo.toml +++ b/test-env/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "test-env" +name = "restate-sdk-test-env" version = "0.3.2" edition = "2021" description = "Test Utilities for Restate SDK for Rust" diff --git a/test-env/tests/test_container.rs b/test-env/tests/test_container.rs index 179882f..647cf83 100644 --- a/test-env/tests/test_container.rs +++ b/test-env/tests/test_container.rs @@ -1,6 +1,6 @@ use reqwest::StatusCode; use restate_sdk::prelude::*; -use test_env::TestContainer; +use restate_sdk_test_env::TestContainer; use tracing::info; // Should compile