Skip to content

Commit b76f361

Browse files
authored
Merge pull request #5903 from Turbo87/publish-buffers
publish: Avoid redundant buffer copies
2 parents 7ef3238 + d9748c8 commit b76f361

File tree

3 files changed

+78
-91
lines changed

3 files changed

+78
-91
lines changed

cargo-registry-s3/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@ impl Bucket {
3737
}
3838
}
3939

40-
pub fn put<R: std::io::Read + Send + 'static>(
40+
pub fn put<R: Into<Body>>(
4141
&self,
4242
client: &Client,
4343
path: &str,
4444
content: R,
45-
content_length: u64,
4645
content_type: &str,
4746
extra_headers: header::HeaderMap,
4847
) -> Result<Response, Error> {
@@ -58,7 +57,7 @@ impl Bucket {
5857
.header(header::DATE, date)
5958
.header(header::USER_AGENT, "crates.io (https://crates.io)")
6059
.headers(extra_headers)
61-
.body(Body::sized(content, content_length))
60+
.body(content.into())
6261
.timeout(Duration::from_secs(60))
6362
.send()?
6463
.error_for_status()

src/controllers/krate/publish.rs

Lines changed: 64 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
//! Functionality related to publishing a new crate or version of a crate.
22
33
use crate::auth::AuthCheck;
4+
use axum::body::Bytes;
45
use flate2::read::GzDecoder;
56
use hex::ToHex;
6-
use http::Request;
7+
use hyper::body::Buf;
78
use sha2::{Digest, Sha256};
89
use std::collections::BTreeMap;
910
use std::io::Read;
1011
use std::path::Path;
1112

1213
use crate::controllers::cargo_prelude::*;
14+
use crate::controllers::util::RequestPartsExt;
1315
use crate::models::{
1416
insert_version_owner_action, Category, Crate, DependencyKind, Keyword, NewCrate, NewVersion,
1517
Rights, VersionAction,
@@ -20,7 +22,7 @@ use crate::middleware::log_request::CustomMetadataRequestExt;
2022
use crate::models::token::EndpointScope;
2123
use crate::schema::*;
2224
use crate::util::errors::{cargo_err, AppResult};
23-
use crate::util::{read_fill, read_le_u32, CargoVcsInfo, LimitErrorReader, Maximums};
25+
use crate::util::{CargoVcsInfo, LimitErrorReader, Maximums};
2426
use crate::views::{
2527
EncodableCrate, EncodableCrateDependency, EncodableCrateUpload, GoodCrate, PublishWarnings,
2628
};
@@ -43,27 +45,38 @@ pub const WILDCARD_ERROR_MESSAGE: &str = "wildcard (`*`) dependency constraints
4345
/// Currently blocks the HTTP thread, perhaps some function calls can spawn new
4446
/// threads and return completion or error through other methods a `cargo publish
4547
/// --status` command, via crates.io's front end, or email.
46-
pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
47-
conduit_compat(move || {
48-
let app = req.app().clone();
48+
pub async fn publish(req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
49+
let (req, body) = req.0.into_parts();
50+
let bytes = body.into_inner();
51+
let (json_bytes, tarball_bytes) = split_body(bytes, &req)?;
52+
53+
let new_crate: EncodableCrateUpload = serde_json::from_slice(&json_bytes)
54+
.map_err(|e| cargo_err(&format_args!("invalid upload request: {e}")))?;
55+
56+
req.add_custom_metadata("crate_name", new_crate.name.to_string());
57+
req.add_custom_metadata("crate_version", new_crate.vers.to_string());
58+
59+
// Make sure required fields are provided
60+
fn empty(s: Option<&String>) -> bool {
61+
s.map_or(true, String::is_empty)
62+
}
4963

50-
// The format of the req.body() of a publish request is as follows:
51-
//
52-
// metadata length
53-
// metadata in JSON about the crate being published
54-
// .crate tarball length
55-
// .crate tarball file
56-
//
57-
// - The metadata is read and interpreted in the parse_new_headers function.
58-
// - The .crate tarball length is read in this function in order to save the size of the file
59-
// in the version record in the database.
60-
// - Then the .crate tarball length is passed to the upload_crate function where the actual
61-
// file is read and uploaded.
64+
// It can have up to three elements per below conditions.
65+
let mut missing = Vec::with_capacity(3);
6266

63-
let new_crate = parse_new_headers(&mut req)?;
67+
if empty(new_crate.description.as_ref()) {
68+
missing.push("description");
69+
}
70+
if empty(new_crate.license.as_ref()) && empty(new_crate.license_file.as_ref()) {
71+
missing.push("license");
72+
}
73+
if !missing.is_empty() {
74+
let message = missing_metadata_error_message(&missing);
75+
return Err(cargo_err(&message));
76+
}
6477

65-
req.add_custom_metadata("crate_name", new_crate.name.to_string());
66-
req.add_custom_metadata("crate_version", new_crate.vers.to_string());
78+
conduit_compat(move || {
79+
let app = req.app().clone();
6780

6881
let conn = app.primary_database.get()?;
6982

@@ -156,13 +169,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
156169
}
157170
}
158171

159-
// Length of the .crate tarball, which appears after the metadata in the request body.
160-
// TODO: Not sure why we're using the total content length (metadata + .crate file length)
161-
// to compare against the max upload size... investigate that and perhaps change to use
162-
// this file length.
163-
let file_length = read_le_u32(req.body_mut())?;
164-
165-
let content_length = req.body().get_ref().len() as u64;
172+
let content_length = tarball_bytes.len() as u64;
166173

167174
let maximums = Maximums::new(
168175
krate.max_upload_size,
@@ -181,9 +188,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
181188
let license = new_crate.license.clone();
182189

183190
// Read tarball from request
184-
let mut tarball = Vec::new();
185-
req.body_mut().read_to_end(&mut tarball)?;
186-
let hex_cksum: String = Sha256::digest(&tarball).encode_hex();
191+
let hex_cksum: String = Sha256::digest(&tarball_bytes).encode_hex();
187192

188193
// Persist the new version of this crate
189194
let version = NewVersion::new(
@@ -194,7 +199,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
194199
license_file,
195200
// Downcast is okay because the file length must be less than the max upload size
196201
// to get here, and max upload sizes are way less than i32 max
197-
file_length as i32,
202+
content_length as i32,
198203
user.id,
199204
hex_cksum.clone(),
200205
links.clone(),
@@ -222,7 +227,8 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
222227
let top_versions = krate.top_versions(&conn)?;
223228

224229
let pkg_name = format!("{}-{}", krate.name, vers);
225-
let cargo_vcs_info = verify_tarball(&pkg_name, &tarball, maximums.max_unpack_size)?;
230+
let cargo_vcs_info =
231+
verify_tarball(&pkg_name, &tarball_bytes, maximums.max_unpack_size)?;
226232
let pkg_path_in_vcs = cargo_vcs_info.map(|info| info.path_in_vcs);
227233

228234
if let Some(readme) = new_crate.readme {
@@ -241,7 +247,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
241247
// Upload crate tarball
242248
app.config
243249
.uploader()
244-
.upload_crate(app.http_client(), tarball, &krate, vers)?;
250+
.upload_crate(app.http_client(), tarball_bytes, &krate, vers)?;
245251

246252
let (features, features2): (BTreeMap<_, _>, BTreeMap<_, _>) =
247253
features.into_iter().partition(|(_k, vals)| {
@@ -300,44 +306,36 @@ fn count_versions_published_today(krate_id: i32, conn: &PgConnection) -> QueryRe
300306
.get_result(conn)
301307
}
302308

303-
/// Used by the `krate::new` function.
304-
///
305-
/// This function parses the JSON headers to interpret the data and validates
306-
/// the data during and after the parsing. Returns crate metadata.
307-
fn parse_new_headers<B: Read>(req: &mut Request<B>) -> AppResult<EncodableCrateUpload> {
308-
// Read the json upload request
309-
let metadata_length = u64::from(read_le_u32(req.body_mut())?);
310-
req.add_custom_metadata("metadata_length", metadata_length);
311-
312-
let max = req.app().config.max_upload_size;
313-
if metadata_length > max {
314-
return Err(cargo_err(&format_args!("max upload size is: {max}")));
315-
}
316-
let mut json = vec![0; metadata_length as usize];
317-
read_fill(req.body_mut(), &mut json)?;
318-
let new: EncodableCrateUpload = serde_json::from_slice(&json)
319-
.map_err(|e| cargo_err(&format_args!("invalid upload request: {e}")))?;
320-
321-
// Make sure required fields are provided
322-
fn empty(s: Option<&String>) -> bool {
323-
s.map_or(true, String::is_empty)
309+
#[instrument(skip_all)]
310+
fn split_body<R: RequestPartsExt>(mut bytes: Bytes, req: &R) -> AppResult<(Bytes, Bytes)> {
311+
// The format of the req.body() of a publish request is as follows:
312+
//
313+
// metadata length
314+
// metadata in JSON about the crate being published
315+
// .crate tarball length
316+
// .crate tarball file
317+
318+
let json_len = bytes.get_u32_le() as usize;
319+
req.add_custom_metadata("metadata_length", json_len);
320+
321+
if json_len > bytes.len() {
322+
return Err(cargo_err(&format!(
323+
"invalid metadata length for remaining payload: {json_len}"
324+
)));
324325
}
325326

326-
// It can have up to three elements per below conditions.
327-
let mut missing = Vec::with_capacity(3);
327+
let json_bytes = bytes.split_to(json_len);
328328

329-
if empty(new.description.as_ref()) {
330-
missing.push("description");
331-
}
332-
if empty(new.license.as_ref()) && empty(new.license_file.as_ref()) {
333-
missing.push("license");
334-
}
335-
if !missing.is_empty() {
336-
let message = missing_metadata_error_message(&missing);
337-
return Err(cargo_err(&message));
329+
let tarball_len = bytes.get_u32_le() as usize;
330+
if tarball_len > bytes.len() {
331+
return Err(cargo_err(&format!(
332+
"invalid metadata length for remaining payload: {tarball_len}"
333+
)));
338334
}
339335

340-
Ok(new)
336+
let tarball_bytes = bytes.split_to(tarball_len);
337+
338+
Ok((json_bytes, tarball_bytes))
341339
}
342340

343341
pub fn missing_metadata_error_message(missing: &[&str]) -> String {

src/uploaders.rs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use reqwest::{blocking::Client, header};
33

44
use crate::util::errors::{internal, AppResult};
55

6+
use reqwest::blocking::Body;
67
use std::env;
78
use std::fs::{self, File};
8-
use std::io::{Cursor, SeekFrom};
99
use std::path::PathBuf;
1010

1111
use crate::models::Crate;
@@ -109,11 +109,11 @@ impl Uploader {
109109
///
110110
/// This function can panic on an `Self::Local` during development.
111111
/// Production and tests use `Self::S3` which should not panic.
112-
pub fn upload<R: std::io::Read + std::io::Seek + Send + 'static>(
112+
pub fn upload<R: Into<Body>>(
113113
&self,
114114
client: &Client,
115115
path: &str,
116-
mut content: R,
116+
content: R,
117117
content_type: &str,
118118
extra_headers: header::HeaderMap,
119119
upload_bucket: UploadBucket,
@@ -130,16 +130,7 @@ impl Uploader {
130130
};
131131

132132
if let Some(bucket) = bucket {
133-
let content_length = content.seek(SeekFrom::End(0))?;
134-
content.rewind()?;
135-
bucket.put(
136-
client,
137-
path,
138-
content,
139-
content_length,
140-
content_type,
141-
extra_headers,
142-
)?;
133+
bucket.put(client, path, content, content_type, extra_headers)?;
143134
}
144135

145136
Ok(Some(String::from(path)))
@@ -149,7 +140,9 @@ impl Uploader {
149140
let dir = filename.parent().unwrap();
150141
fs::create_dir_all(dir)?;
151142
let mut file = File::create(&filename)?;
152-
std::io::copy(&mut content, &mut file)?;
143+
let mut body = content.into();
144+
let mut buffer = body.buffer()?;
145+
std::io::copy(&mut buffer, &mut file)?;
153146
Ok(filename.to_str().map(String::from))
154147
}
155148
}
@@ -183,15 +176,14 @@ impl Uploader {
183176
}
184177

185178
/// Uploads a crate and returns the checksum of the uploaded crate file.
186-
pub fn upload_crate(
179+
pub fn upload_crate<R: Into<Body>>(
187180
&self,
188181
http_client: &Client,
189-
body: Vec<u8>,
182+
body: R,
190183
krate: &Crate,
191184
vers: &semver::Version,
192185
) -> AppResult<()> {
193186
let path = Uploader::crate_path(&krate.name, &vers.to_string());
194-
let content = Cursor::new(body);
195187
let mut extra_headers = header::HeaderMap::new();
196188
extra_headers.insert(
197189
header::CACHE_CONTROL,
@@ -200,7 +192,7 @@ impl Uploader {
200192
self.upload(
201193
http_client,
202194
&path,
203-
content,
195+
body,
204196
"application/gzip",
205197
extra_headers,
206198
UploadBucket::Default,
@@ -217,7 +209,6 @@ impl Uploader {
217209
readme: String,
218210
) -> Result<()> {
219211
let path = Uploader::readme_path(crate_name, vers);
220-
let content = Cursor::new(readme);
221212
let mut extra_headers = header::HeaderMap::new();
222213
extra_headers.insert(
223214
header::CACHE_CONTROL,
@@ -226,7 +217,7 @@ impl Uploader {
226217
self.upload(
227218
http_client,
228219
&path,
229-
content,
220+
readme,
230221
"text/html",
231222
extra_headers,
232223
UploadBucket::Default,
@@ -241,7 +232,6 @@ impl Uploader {
241232
index: String,
242233
) -> Result<()> {
243234
let path = Uploader::index_path(crate_name);
244-
let content = Cursor::new(index);
245235
let mut extra_headers = header::HeaderMap::new();
246236
extra_headers.insert(
247237
header::CACHE_CONTROL,
@@ -250,7 +240,7 @@ impl Uploader {
250240
self.upload(
251241
http_client,
252242
&path,
253-
content,
243+
index,
254244
"text/plain",
255245
extra_headers,
256246
UploadBucket::Index,

0 commit comments

Comments
 (0)