Skip to content

Commit 8d3d831

Browse files
authored
Support object_store with wasm: Default wasm32-unknown-unknown HttpConnector (#329)
* Implement default wasm32-unknown-unknown (JS platform dependent) HttpConnector. * Disable all test blocks that strictly require the fs feature on wasm32, as well as those that use the MockServer. Very basic test wasm tests, address dropped/unused receiver in wasm bridge code * CI wasm32-unknown-unknown tests * Install node for wasm tests in CI * cargo fmt * Hoist use/imports in wasm HttpService
1 parent e157e2c commit 8d3d831

File tree

11 files changed

+142
-7
lines changed

11 files changed

+142
-7
lines changed

.cargo/config.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[target.wasm32-unknown-unknown]
2+
rustflags = ['--cfg', 'getrandom_backend="wasm_js"']

.github/workflows/ci.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,13 @@ jobs:
192192
run: rustup target add wasm32-wasip1
193193
- name: Build wasm32-wasip1
194194
run: cargo build --all-features --target wasm32-wasip1
195+
- name: Install wasm-pack
196+
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
197+
- uses: actions/setup-node@v4
198+
with:
199+
node-version: 20
200+
- name: Run wasm32-unknown-unknown tests (via Node)
201+
run: wasm-pack test --node --features http --no-default-features
195202

196203
windows:
197204
name: cargo test LocalFileSystem (win64)

Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-ut
6464
[target.'cfg(target_family="unix")'.dev-dependencies]
6565
nix = { version = "0.29.0", features = ["fs"] }
6666

67+
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
68+
web-time = { version = "1.1.0" }
69+
wasm-bindgen-futures = "0.4.18"
70+
6771
[features]
6872
default = ["fs"]
6973
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded"]
@@ -84,6 +88,20 @@ regex = "1.11.1"
8488
# The "gzip" feature for reqwest is enabled for an integration test.
8589
reqwest = { version = "0.12", features = ["gzip"] }
8690

91+
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dev-dependencies]
92+
wasm-bindgen-test = "*"
93+
94+
[dev-dependencies.getrandom_v03]
95+
package = "getrandom"
96+
version = "0.3"
97+
features = ["wasm_js"]
98+
99+
[dev-dependencies.getrandom_v02]
100+
package = "getrandom"
101+
version = "0.2"
102+
features = ["js"]
103+
87104
[[test]]
88105
name = "get_range_file"
89106
path = "tests/get_range_file.rs"
107+
required-features = ["fs"]

src/client/body.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ impl HttpRequestBody {
4949
}
5050
}
5151

52+
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
53+
pub(crate) fn into_reqwest(self) -> reqwest::Body {
54+
match self.0 {
55+
Inner::Bytes(b) => b.into(),
56+
Inner::PutPayload(_, payload) => Bytes::from(payload).into(),
57+
}
58+
}
59+
5260
/// Returns true if this body is empty
5361
pub fn is_empty(&self) -> bool {
5462
match &self.0 {

src/client/connection.rs

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,60 @@ impl HttpService for reqwest::Client {
224224
}
225225
}
226226

227+
#[async_trait]
228+
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
229+
impl HttpService for reqwest::Client {
230+
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
231+
use futures::{
232+
channel::{mpsc, oneshot},
233+
SinkExt, StreamExt, TryStreamExt,
234+
};
235+
use http_body_util::{Empty, StreamBody};
236+
use wasm_bindgen_futures::spawn_local;
237+
238+
let (parts, body) = req.into_parts();
239+
let url = parts.uri.to_string().parse().unwrap();
240+
let mut req = reqwest::Request::new(parts.method, url);
241+
*req.headers_mut() = parts.headers;
242+
*req.body_mut() = Some(body.into_reqwest());
243+
244+
let (mut tx, rx) = mpsc::channel(1);
245+
let (tx_parts, rx_parts) = oneshot::channel();
246+
let res_fut = self.execute(req);
247+
248+
spawn_local(async move {
249+
match res_fut.await.map_err(HttpError::reqwest) {
250+
Err(err) => {
251+
let _ = tx_parts.send(Err(err));
252+
drop(tx);
253+
}
254+
Ok(res) => {
255+
let (mut parts, _) = http::Response::new(Empty::<()>::new()).into_parts();
256+
parts.headers = res.headers().clone();
257+
parts.status = res.status();
258+
let _ = tx_parts.send(Ok(parts));
259+
let mut stream = res.bytes_stream().map_err(HttpError::reqwest);
260+
while let Some(chunk) = stream.next().await {
261+
if let Err(_e) = tx.send(chunk).await {
262+
// Disconnected due to a transitive drop of the receiver
263+
break;
264+
}
265+
}
266+
}
267+
}
268+
});
269+
270+
let parts = rx_parts.await.unwrap()?;
271+
let safe_stream = rx.map(|chunk| {
272+
let frame = hyper::body::Frame::data(chunk?);
273+
Ok(frame)
274+
});
275+
let body = HttpResponseBody::new(StreamBody::new(safe_stream));
276+
277+
Ok(HttpResponse::from_parts(parts, body))
278+
}
279+
}
280+
227281
/// A factory for [`HttpClient`]
228282
pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
229283
/// Create a new [`HttpClient`] with the provided [`ClientOptions`]
@@ -233,32 +287,32 @@ pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
233287
/// [`HttpConnector`] using [`reqwest::Client`]
234288
#[derive(Debug, Default)]
235289
#[allow(missing_copy_implementations)]
236-
#[cfg(not(target_arch = "wasm32"))]
290+
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
237291
pub struct ReqwestConnector {}
238292

239-
#[cfg(not(target_arch = "wasm32"))]
293+
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
240294
impl HttpConnector for ReqwestConnector {
241295
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
242296
let client = options.client()?;
243297
Ok(HttpClient::new(client))
244298
}
245299
}
246300

247-
#[cfg(target_arch = "wasm32")]
301+
#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
248302
pub(crate) fn http_connector(
249303
custom: Option<Arc<dyn HttpConnector>>,
250304
) -> crate::Result<Arc<dyn HttpConnector>> {
251305
match custom {
252306
Some(x) => Ok(x),
253307
None => Err(crate::Error::NotSupported {
254-
source: "WASM32 architectures must provide an HTTPConnector"
308+
source: "WASI architectures must provide an HTTPConnector"
255309
.to_string()
256310
.into(),
257311
}),
258312
}
259313
}
260314

261-
#[cfg(not(target_arch = "wasm32"))]
315+
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
262316
pub(crate) fn http_connector(
263317
custom: Option<Arc<dyn HttpConnector>>,
264318
) -> crate::Result<Arc<dyn HttpConnector>> {

src/client/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub(crate) mod backoff;
2222
#[cfg(not(target_arch = "wasm32"))]
2323
mod dns;
2424

25+
#[cfg(not(target_arch = "wasm32"))]
2526
#[cfg(test)]
2627
pub(crate) mod mock_server;
2728

@@ -50,7 +51,7 @@ pub(crate) mod builder;
5051

5152
mod connection;
5253
pub(crate) use connection::http_connector;
53-
#[cfg(not(target_arch = "wasm32"))]
54+
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
5455
pub use connection::ReqwestConnector;
5556
pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService};
5657

@@ -718,6 +719,22 @@ impl ClientOptions {
718719
.build()
719720
.map_err(map_client_error)
720721
}
722+
723+
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
724+
pub(crate) fn client(&self) -> Result<reqwest::Client> {
725+
let mut builder = reqwest::ClientBuilder::new();
726+
727+
match &self.user_agent {
728+
Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
729+
None => builder = builder.user_agent(DEFAULT_USER_AGENT),
730+
}
731+
732+
if let Some(headers) = &self.default_headers {
733+
builder = builder.default_headers(headers.clone())
734+
}
735+
736+
builder.build().map_err(map_client_error)
737+
}
721738
}
722739

723740
pub(crate) trait GetOptionsExt {

src/client/retry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ use futures::future::BoxFuture;
2626
use http::{Method, Uri};
2727
use reqwest::header::LOCATION;
2828
use reqwest::StatusCode;
29+
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
2930
use std::time::{Duration, Instant};
3031
use tracing::info;
32+
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
33+
use web_time::{Duration, Instant};
3134

3235
/// Retry request error
3336
#[derive(Debug, thiserror::Error)]
@@ -469,6 +472,7 @@ impl RetryExt for HttpRequestBuilder {
469472
}
470473
}
471474

475+
#[cfg(not(target_arch = "wasm32"))]
472476
#[cfg(test)]
473477
mod tests {
474478
use crate::client::mock_server::MockServer;

src/parse.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ mod tests {
348348
}
349349

350350
#[tokio::test]
351-
#[cfg(feature = "http")]
351+
#[cfg(all(feature = "http", not(target_arch = "wasm32")))]
352352
async fn test_url_http() {
353353
use crate::client::mock_server::MockServer;
354354
use http::{header::USER_AGENT, Response};

src/prefix.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
221221
}
222222
}
223223

224+
#[cfg(not(target_arch = "wasm32"))]
224225
#[cfg(test)]
225226
mod tests {
226227
use super::*;

src/throttle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ mod tests {
597597
}
598598

599599
#[allow(dead_code)]
600+
#[cfg(target_os = "linux")]
600601
async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
601602
let path = place_test_object(store, n_bytes).await;
602603

tests/http.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
#[cfg(feature = "http")]
2121
use object_store::{http::HttpBuilder, path::Path, GetOptions, GetRange, ObjectStore};
2222

23+
#[cfg(all(feature = "http", target_arch = "wasm32", target_os = "unknown"))]
24+
use wasm_bindgen_test::*;
25+
2326
/// Tests that even when reqwest has the `gzip` feature enabled, the HTTP store
2427
/// does not error on a missing `Content-Length` header.
2528
#[tokio::test]
@@ -41,3 +44,23 @@ async fn test_http_store_gzip() {
4144
.await
4245
.unwrap();
4346
}
47+
48+
#[cfg(all(feature = "http", target_arch = "wasm32", target_os = "unknown"))]
49+
#[wasm_bindgen_test]
50+
async fn basic_wasm_get() {
51+
let http_store = HttpBuilder::new()
52+
.with_url("https://raw.githubusercontent.com/apache/arrow-rs/refs/heads/main")
53+
.build()
54+
.unwrap();
55+
56+
let _ = http_store
57+
.get_opts(
58+
&Path::parse("LICENSE.txt").unwrap(),
59+
GetOptions {
60+
range: Some(GetRange::Bounded(0..100)),
61+
..Default::default()
62+
},
63+
)
64+
.await
65+
.unwrap();
66+
}

0 commit comments

Comments
 (0)