Skip to content

Commit 6acd5e2

Browse files
committed
Upgrade to hyper-0.13
1 parent 2c3032d commit 6acd5e2

File tree

8 files changed

+117
-119
lines changed

8 files changed

+117
-119
lines changed

Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ edition = "2018"
1010

1111
[dependencies]
1212
conduit = "0.8"
13-
futures-preview = "0.3.0-alpha.19"
14-
hyper = { version = "0.13.0-alpha.4", features = ["unstable-stream"] }
15-
http = "0.1"
13+
futures = "0.3"
14+
hyper = "0.13"
15+
http = "0.2"
1616
tracing = { version = "0.1", features = ["log"] }
1717
semver = "0.5" # Must match version in conduit for now
18-
tokio-executor = { version = "0.2.0-alpha.6", features = ["threadpool"] }
19-
tower-service = "0.3.0-alpha.2"
18+
tokio = { version = "0.2", features = ["blocking", "rt-threaded"] }
19+
tower-service = "0.3"
2020

2121
[dev-dependencies]
2222
conduit-router = "0.8"
23-
env_logger = "0.6"
24-
tokio = { version = "0.2.0-alpha.2", default-features = false, features = ["rt-full"] }
23+
env_logger = "0.7"
24+
tokio = { version = "0.2", features = ["macros"] }

examples/server.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,20 @@
33
use conduit::{Handler, Request, Response};
44
use conduit_hyper::Server;
55
use conduit_router::RouteBuilder;
6-
use tokio::runtime;
76

87
use std::collections::HashMap;
98
use std::io::{Cursor, Error};
109
use std::thread::sleep;
1110

12-
fn main() {
11+
#[tokio::main]
12+
async fn main() {
1313
env_logger::init();
1414

1515
let app = build_conduit_handler();
1616
let addr = ([127, 0, 0, 1], 12345).into();
17-
let server = Server::bind(&addr, app);
18-
19-
let rt = runtime::Builder::new()
20-
// Set the max number of concurrent requests (tokio defaults to 100)
21-
.blocking_threads(2)
22-
.build()
23-
.unwrap();
24-
rt.spawn(async {
25-
server.await;
26-
});
27-
rt.shutdown_on_idle();
17+
18+
// FIXME: Set limit on number of blocking tasks
19+
Server::serve(&addr, app).await;
2820
}
2921

3022
fn build_conduit_handler() -> impl Handler {

src/adaptor.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ use std::path::{Component, Path, PathBuf};
1515

1616
use conduit::{Extensions, Headers, Host, Method, Request, Scheme};
1717
use http::{request::Parts as HttpParts, HeaderMap};
18-
use hyper::{Chunk, Method as HyperMethod, Version as HttpVersion};
18+
use hyper::{body::Bytes, Method as HyperMethod, Version as HttpVersion};
1919
use semver::Version;
2020

2121
/// Owned data consumed by the background thread
2222
///
2323
/// `ConduitRequest` cannot be sent between threads, so the needed request data
2424
/// is extracted from hyper on a core thread and taken by the background thread.
25-
pub(crate) struct RequestInfo(Option<(Parts, Chunk)>);
25+
pub(crate) struct RequestInfo(Option<(Parts, Bytes)>);
2626

2727
impl RequestInfo {
2828
/// Save the request info that can be sent between threads
29-
pub(crate) fn new(parts: HttpParts, body: Chunk) -> Self {
29+
pub(crate) fn new(parts: HttpParts, body: Bytes) -> Self {
3030
let tuple = (Parts(parts), body);
3131
Self(Some(tuple))
3232
}
@@ -38,7 +38,7 @@ impl RequestInfo {
3838
/// # Panics
3939
///
4040
/// Panics if called more than once on a value.
41-
fn take(&mut self) -> (Parts, Chunk) {
41+
fn take(&mut self) -> (Parts, Bytes) {
4242
self.0.take().expect("called take multiple times")
4343
}
4444
}
@@ -94,7 +94,7 @@ pub(crate) struct ConduitRequest {
9494
parts: Parts,
9595
path: String,
9696
remote_addr: SocketAddr,
97-
body: Cursor<Chunk>,
97+
body: Cursor<Bytes>,
9898
extensions: Extensions, // makes struct non-Send
9999
}
100100

@@ -148,6 +148,8 @@ impl Request for ConduitRequest {
148148
HttpVersion::HTTP_10 => version(1, 0),
149149
HttpVersion::HTTP_11 => version(1, 1),
150150
HttpVersion::HTTP_2 => version(2, 0),
151+
HttpVersion::HTTP_3 => version(3, 0),
152+
_ => version(0, 0),
151153
}
152154
}
153155

src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![deny(warnings, clippy::all, missing_debug_implementations)]
1+
#![deny(warnings, clippy::all, missing_debug_implementations, rust_2018_idioms)]
22
#![allow(clippy::needless_doctest_main, clippy::unknown_clippy_lints)] // using the tokio::main attribute
33

44
//! A wrapper for integrating `hyper 0.13` with a `conduit 0.8` blocking application stack.
@@ -16,14 +16,13 @@
1616
//! ```no_run
1717
//! use conduit::Handler;
1818
//! use conduit_hyper::Server;
19-
//! use futures::executor::block_on;
2019
//! use tokio::runtime::Runtime;
2120
//!
2221
//! #[tokio::main]
2322
//! async fn main() {
2423
//! let app = build_conduit_handler();
2524
//! let addr = ([127, 0, 0, 1], 12345).into();
26-
//! let server = Server::bind(&addr, app);
25+
//! let server = Server::serve(&addr, app);
2726
//!
2827
//! server.await;
2928
//! }

src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ impl Server {
1919
/// `tokio::Runtime` it is not possible to furter configure the `hyper::Server`. If more
2020
/// control, such as configuring a graceful shutdown is necessary, then call
2121
/// `Service::from_conduit` instead.
22-
pub fn bind<H: conduit::Handler>(addr: &SocketAddr, handler: H) -> impl Future {
22+
pub fn serve<H: conduit::Handler>(addr: &SocketAddr, handler: H) -> impl Future {
2323
let handler = Arc::new(handler);
2424
let make_service = make_service_fn(move |socket: &AddrStream| {
2525
let handler = handler.clone();

src/service.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ use std::future::Future;
44
use std::net::SocketAddr;
55
use std::sync::Arc;
66

7-
use futures::prelude::*;
8-
use hyper::{service, Body, Error, Request, Response, StatusCode};
7+
use hyper::{service, Body, Request, Response, StatusCode};
98
use tracing::error;
109

10+
mod error;
11+
pub use error::ServiceError;
12+
1113
/// A builder for a `hyper::Service`.
1214
#[derive(Debug)]
1315
pub struct Service;
@@ -54,10 +56,10 @@ impl Service {
5456
impl tower_service::Service<
5557
Request<Body>,
5658
Response = Response<Body>,
57-
Error = Error,
58-
Future = impl Future<Output = Result<Response<Body>, Error>> + Send + 'static,
59+
Error = ServiceError,
60+
Future = impl Future<Output = Result<Response<Body>, ServiceError>> + Send + 'static,
5961
>,
60-
Error,
62+
ServiceError,
6163
> {
6264
Ok(service::service_fn(move |request: Request<Body>| {
6365
blocking_handler(handler.clone(), request, remote_addr)
@@ -70,26 +72,22 @@ pub(crate) async fn blocking_handler<H: conduit::Handler>(
7072
handler: Arc<H>,
7173
request: Request<Body>,
7274
remote_addr: SocketAddr,
73-
) -> Result<Response<Body>, Error> {
75+
) -> Result<Response<Body>, ServiceError> {
7476
let (parts, body) = request.into_parts();
75-
let full_body = body.try_concat().await?;
76-
let mut request_info = RequestInfo::new(parts, full_body);
7777

78-
let future = future::poll_fn(move |_| {
79-
tokio_executor::threadpool::blocking(|| {
80-
let mut request = ConduitRequest::new(&mut request_info, remote_addr);
81-
handler
82-
.call(&mut request)
83-
.map(good_response)
84-
.unwrap_or_else(|e| error_response(&e.to_string()))
85-
})
86-
.map_err(|_| panic!("The threadpool shut down"))
87-
});
78+
let full_body = hyper::body::to_bytes(body).await?;
79+
let mut request_info = RequestInfo::new(parts, full_body);
8880

89-
// Spawn as a new top-level task, otherwise the parent task is blocked as well
90-
let (future, handle) = future.remote_handle();
91-
tokio_executor::spawn(future);
92-
handle.await
81+
// FIXME: Provide a configurable limit on the number of blocking tasks
82+
tokio::task::spawn_blocking(move || {
83+
let mut request = ConduitRequest::new(&mut request_info, remote_addr);
84+
handler
85+
.call(&mut request)
86+
.map(good_response)
87+
.unwrap_or_else(|e| error_response(&e.to_string()))
88+
})
89+
.await
90+
.map_err(Into::into)
9391
}
9492

9593
/// Builds a `hyper::Response` given a `conduit:Response`
@@ -104,15 +102,15 @@ fn good_response(mut response: conduit::Response) -> Response<Body> {
104102
Ok(s) => s,
105103
Err(e) => return error_response(&e.to_string()),
106104
};
107-
builder.status(status);
108105

109106
for (key, values) in response.headers {
110107
for value in values {
111-
builder.header(key.as_str(), value.as_str());
108+
builder = builder.header(key.as_str(), value.as_str());
112109
}
113110
}
114111

115112
builder
113+
.status(status)
116114
.body(body.into())
117115
.unwrap_or_else(|e| error_response(&e.to_string()))
118116
}

src/service/error.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::error::Error;
2+
3+
use tokio::task::JoinError;
4+
5+
#[derive(Debug)]
6+
pub enum ServiceError {
7+
JoinError(std::io::Error),
8+
Hyper(hyper::Error),
9+
}
10+
11+
impl Error for ServiceError {}
12+
13+
impl std::fmt::Display for ServiceError {
14+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15+
match self {
16+
ServiceError::JoinError(e) => e.fmt(f),
17+
ServiceError::Hyper(e) => e.fmt(f),
18+
}
19+
}
20+
}
21+
22+
impl From<JoinError> for ServiceError {
23+
fn from(e: JoinError) -> Self {
24+
ServiceError::JoinError(e.into())
25+
}
26+
}
27+
28+
impl From<hyper::Error> for ServiceError {
29+
fn from(e: hyper::Error) -> Self {
30+
ServiceError::Hyper(e)
31+
}
32+
}

0 commit comments

Comments
 (0)