Skip to content

Moved hyper support in a separate module. #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ repository = "https://github.com/restatedev/sdk-rust"

[features]
default = ["http_server", "rand", "uuid"]
http_server = ["hyper", "http-body-util", "hyper-util", "tokio/net", "tokio/signal", "restate-sdk-shared-core/http"]
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal"]

[dependencies]
bytes = "1.6.1"
futures = "0.3"
http = "1.1.0"
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1.4.1", optional = true, features = ["server", "http2"] }
hyper = { version = "1.4.1", optional = true}
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
pin-project-lite = "0.2"
rand = { version = "0.8.5", optional = true }
Expand All @@ -25,7 +26,7 @@ restate-sdk-shared-core = { version = "0.0.5" }
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0.63"
tokio = { version = "1", default-features = false, features = ["sync", "macros"] }
tokio = { version = "1", default-features = false, features = ["sync"] }
tower-service = "0.3"
tracing = "0.1"
uuid = { version = "1.10.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Greeter for GreeterImpl {
async fn main() {
// To enable logging/tracing
// tracing_subscriber::fmt::init();
HyperServer::new(
HttpServer::new(
Endpoint::builder()
.with_service(GreeterImpl.serve())
.build(),
Expand Down
2 changes: 1 addition & 1 deletion examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Counter for CounterImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HyperServer::new(
HttpServer::new(
Endpoint::builder()
.with_service(CounterImpl.serve())
.build(),
Expand Down
2 changes: 1 addition & 1 deletion examples/failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl FailureExample for FailureExampleImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HyperServer::new(
HttpServer::new(
Endpoint::builder()
.with_service(FailureExampleImpl.serve())
.build(),
Expand Down
2 changes: 1 addition & 1 deletion examples/greeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Greeter for GreeterImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HyperServer::new(
HttpServer::new(
Endpoint::builder()
.with_service(GreeterImpl.serve())
.build(),
Expand Down
2 changes: 1 addition & 1 deletion examples/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl RunExample for RunExampleImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HyperServer::new(
HttpServer::new(
Endpoint::builder()
.with_service(RunExampleImpl(reqwest::Client::new()).serve())
.build(),
Expand Down
79 changes: 79 additions & 0 deletions src/http_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::endpoint::Endpoint;
use crate::hyper::HyperEndpoint;
use futures::FutureExt;
use hyper::server::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpListener;
use tracing::{info, warn};

pub struct HttpServer {
endpoint: Endpoint,
}

impl From<Endpoint> for HttpServer {
fn from(endpoint: Endpoint) -> Self {
Self { endpoint }
}
}

impl HttpServer {
pub fn new(endpoint: Endpoint) -> Self {
Self { endpoint }
}

pub async fn listen_and_serve(self, addr: SocketAddr) {
let listener = TcpListener::bind(addr).await.expect("listener can bind");
self.serve(listener).await;
}

pub async fn serve(self, listener: TcpListener) {
self.serve_with_cancel(listener, tokio::signal::ctrl_c().map(|_| ()))
.await;
}

pub async fn serve_with_cancel(self, listener: TcpListener, cancel_signal_future: impl Future) {
let endpoint = HyperEndpoint::new(self.endpoint);
let graceful = hyper_util::server::graceful::GracefulShutdown::new();

// when this signal completes, start shutdown
let mut signal = std::pin::pin!(cancel_signal_future);

info!("Starting listening on {}", listener.local_addr().unwrap());

// Our server accept loop
loop {
tokio::select! {
Ok((stream, remote)) = listener.accept() => {
let endpoint = endpoint.clone();

let conn = http2::Builder::new(TokioExecutor::default())
.serve_connection(TokioIo::new(stream), endpoint);

let fut = graceful.watch(conn);

tokio::spawn(async move {
if let Err(e) = fut.await {
warn!("Error serving connection {remote}: {:?}", e);
}
});
},
_ = &mut signal => {
info!("Shutting down");
// stop the accept loop
break;
}
}
}

// Wait graceful shutdown
tokio::select! {
_ = graceful.shutdown() => {},
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("Timed out waiting for all connections to close");
}
}
}
}
88 changes: 9 additions & 79 deletions src/http.rs → src/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,29 @@ use crate::endpoint::{Endpoint, InputReceiver, OutputSender};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use http::header::CONTENT_TYPE;
use http::{response, Request, Response};
use http_body_util::{BodyExt, Either, Full};
use hyper::body::{Body, Frame, Incoming};
use hyper::header::CONTENT_TYPE;
use hyper::http::response;
use hyper::server::conn::http2;
use hyper::service::Service;
use hyper::{Request, Response};
use hyper_util::rt::{TokioExecutor, TokioIo};
use restate_sdk_shared_core::ResponseHead;
use std::convert::Infallible;
use std::future::{ready, Future, Ready};
use std::net::SocketAddr;
use std::future::{ready, Ready};
use std::ops::Deref;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tracing::{info, warn};
use tracing::warn;

pub struct HyperServer {
endpoint: Endpoint,
}

impl From<Endpoint> for HyperServer {
fn from(endpoint: Endpoint) -> Self {
Self { endpoint }
}
}
#[derive(Clone)]
pub struct HyperEndpoint(Endpoint);

impl HyperServer {
impl HyperEndpoint {
pub fn new(endpoint: Endpoint) -> Self {
Self { endpoint }
}

pub async fn listen_and_serve(self, addr: SocketAddr) {
let listener = TcpListener::bind(addr).await.expect("listener can bind");
self.serve(listener).await;
}

pub async fn serve(self, listener: TcpListener) {
self.serve_with_cancel(listener, tokio::signal::ctrl_c().map(|_| ()))
.await;
}

pub async fn serve_with_cancel(self, listener: TcpListener, cancel_signal_future: impl Future) {
let endpoint = HyperEndpoint(self.endpoint);
let graceful = hyper_util::server::graceful::GracefulShutdown::new();

// when this signal completes, start shutdown
let mut signal = std::pin::pin!(cancel_signal_future);

info!("Starting listening on {}", listener.local_addr().unwrap());

// Our server accept loop
loop {
tokio::select! {
Ok((stream, remote)) = listener.accept() => {
let endpoint = endpoint.clone();

let conn = http2::Builder::new(TokioExecutor::default())
.serve_connection(TokioIo::new(stream), endpoint);

let fut = graceful.watch(conn);

tokio::spawn(async move {
if let Err(e) = fut.await {
warn!("Error serving connection {remote}: {:?}", e);
}
});
},
_ = &mut signal => {
info!("Shutting down");
// stop the accept loop
break;
}
}
}

// Wait graceful shutdown
tokio::select! {
_ = graceful.shutdown() => {},
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("Timed out waiting for all connections to close");
}
}
Self(endpoint)
}
}

#[derive(Clone)]
struct HyperEndpoint(Endpoint);

impl Service<Request<Incoming>> for HyperEndpoint {
type Response = Response<Either<Full<Bytes>, BidiStreamRunner>>;
type Error = endpoint::Error;
Expand Down Expand Up @@ -155,8 +86,7 @@ fn response_builder_from_response_head(response_head: ResponseHead) -> response:
response_builder
}

// TODO use pin_project
struct BidiStreamRunner {
pub struct BidiStreamRunner {
fut: Option<BoxFuture<'static, Result<(), endpoint::Error>>>,
output_rx: mpsc::UnboundedReceiver<Bytes>,
end_stream: bool,
Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ pub mod context;
pub mod discovery;
pub mod errors;
#[cfg(feature = "http_server")]
pub mod http;
pub mod http_server;
#[cfg(feature = "hyper")]
pub mod hyper;
pub mod serde;

pub use restate_sdk_macros::{object, service, workflow};

pub mod prelude {
#[cfg(feature = "http_server")]
pub use crate::http::HyperServer;
pub use crate::http_server::HttpServer;

pub use crate::context::{
Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState,
Expand Down
4 changes: 2 additions & 2 deletions test-services/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod non_deterministic;
mod proxy;
mod test_utils_service;

use restate_sdk::prelude::{Endpoint, HyperServer};
use restate_sdk::prelude::{Endpoint, HttpServer};
use std::env;

#[tokio::main]
Expand Down Expand Up @@ -77,7 +77,7 @@ async fn main() {
))
}

HyperServer::new(builder.build())
HttpServer::new(builder.build())
.listen_and_serve(format!("0.0.0.0:{port}").parse().unwrap())
.await;
}
2 changes: 1 addition & 1 deletion tests/ui/shared_handler_in_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl SharedHandlerInService for SharedHandlerInServiceImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HyperServer::new(
HttpServer::new(
Endpoint::builder()
.with_service(SharedHandlerInServiceImpl.serve())
.build(),
Expand Down
Loading