Skip to content

publish: Avoid redundant buffer copies #5903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cargo-registry-s3/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ impl Bucket {
}
}

pub fn put<R: std::io::Read + Send + 'static>(
pub fn put<R: Into<Body>>(
&self,
client: &Client,
path: &str,
content: R,
content_length: u64,
content_type: &str,
extra_headers: header::HeaderMap,
) -> Result<Response, Error> {
Expand All @@ -58,7 +57,7 @@ impl Bucket {
.header(header::DATE, date)
.header(header::USER_AGENT, "crates.io (https://crates.io)")
.headers(extra_headers)
.body(Body::sized(content, content_length))
.body(content.into())
.timeout(Duration::from_secs(60))
.send()?
.error_for_status()
Expand Down
130 changes: 64 additions & 66 deletions src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
//! Functionality related to publishing a new crate or version of a crate.

use crate::auth::AuthCheck;
use axum::body::Bytes;
use flate2::read::GzDecoder;
use hex::ToHex;
use http::Request;
use hyper::body::Buf;
use sha2::{Digest, Sha256};
use std::collections::BTreeMap;
use std::io::Read;
use std::path::Path;

use crate::controllers::cargo_prelude::*;
use crate::controllers::util::RequestPartsExt;
use crate::models::{
insert_version_owner_action, Category, Crate, DependencyKind, Keyword, NewCrate, NewVersion,
Rights, VersionAction,
Expand All @@ -20,7 +22,7 @@ use crate::middleware::log_request::CustomMetadataRequestExt;
use crate::models::token::EndpointScope;
use crate::schema::*;
use crate::util::errors::{cargo_err, AppResult};
use crate::util::{read_fill, read_le_u32, CargoVcsInfo, LimitErrorReader, Maximums};
use crate::util::{CargoVcsInfo, LimitErrorReader, Maximums};
use crate::views::{
EncodableCrate, EncodableCrateDependency, EncodableCrateUpload, GoodCrate, PublishWarnings,
};
Expand All @@ -43,27 +45,38 @@ pub const WILDCARD_ERROR_MESSAGE: &str = "wildcard (`*`) dependency constraints
/// Currently blocks the HTTP thread, perhaps some function calls can spawn new
/// threads and return completion or error through other methods a `cargo publish
/// --status` command, via crates.io's front end, or email.
pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
conduit_compat(move || {
let app = req.app().clone();
pub async fn publish(req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
let (req, body) = req.0.into_parts();
let bytes = body.into_inner();
let (json_bytes, tarball_bytes) = split_body(bytes, &req)?;

let new_crate: EncodableCrateUpload = serde_json::from_slice(&json_bytes)
.map_err(|e| cargo_err(&format_args!("invalid upload request: {e}")))?;

req.add_custom_metadata("crate_name", new_crate.name.to_string());
req.add_custom_metadata("crate_version", new_crate.vers.to_string());

// Make sure required fields are provided
fn empty(s: Option<&String>) -> bool {
s.map_or(true, String::is_empty)
}

// The format of the req.body() of a publish request is as follows:
//
// metadata length
// metadata in JSON about the crate being published
// .crate tarball length
// .crate tarball file
//
// - The metadata is read and interpreted in the parse_new_headers function.
// - The .crate tarball length is read in this function in order to save the size of the file
// in the version record in the database.
// - Then the .crate tarball length is passed to the upload_crate function where the actual
// file is read and uploaded.
// It can have up to three elements per below conditions.
let mut missing = Vec::with_capacity(3);

let new_crate = parse_new_headers(&mut req)?;
if empty(new_crate.description.as_ref()) {
missing.push("description");
}
if empty(new_crate.license.as_ref()) && empty(new_crate.license_file.as_ref()) {
missing.push("license");
}
if !missing.is_empty() {
let message = missing_metadata_error_message(&missing);
return Err(cargo_err(&message));
}

req.add_custom_metadata("crate_name", new_crate.name.to_string());
req.add_custom_metadata("crate_version", new_crate.vers.to_string());
conduit_compat(move || {
let app = req.app().clone();

let conn = app.primary_database.get()?;

Expand Down Expand Up @@ -156,13 +169,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
}
}

// Length of the .crate tarball, which appears after the metadata in the request body.
// TODO: Not sure why we're using the total content length (metadata + .crate file length)
// to compare against the max upload size... investigate that and perhaps change to use
// this file length.
let file_length = read_le_u32(req.body_mut())?;

let content_length = req.body().get_ref().len() as u64;
let content_length = tarball_bytes.len() as u64;

let maximums = Maximums::new(
krate.max_upload_size,
Expand All @@ -181,9 +188,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
let license = new_crate.license.clone();

// Read tarball from request
let mut tarball = Vec::new();
req.body_mut().read_to_end(&mut tarball)?;
let hex_cksum: String = Sha256::digest(&tarball).encode_hex();
let hex_cksum: String = Sha256::digest(&tarball_bytes).encode_hex();

// Persist the new version of this crate
let version = NewVersion::new(
Expand All @@ -194,7 +199,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
license_file,
// Downcast is okay because the file length must be less than the max upload size
// to get here, and max upload sizes are way less than i32 max
file_length as i32,
content_length as i32,
user.id,
hex_cksum.clone(),
links.clone(),
Expand Down Expand Up @@ -222,7 +227,8 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
let top_versions = krate.top_versions(&conn)?;

let pkg_name = format!("{}-{}", krate.name, vers);
let cargo_vcs_info = verify_tarball(&pkg_name, &tarball, maximums.max_unpack_size)?;
let cargo_vcs_info =
verify_tarball(&pkg_name, &tarball_bytes, maximums.max_unpack_size)?;
let pkg_path_in_vcs = cargo_vcs_info.map(|info| info.path_in_vcs);

if let Some(readme) = new_crate.readme {
Expand All @@ -241,7 +247,7 @@ pub async fn publish(mut req: ConduitRequest) -> AppResult<Json<GoodCrate>> {
// Upload crate tarball
app.config
.uploader()
.upload_crate(app.http_client(), tarball, &krate, vers)?;
.upload_crate(app.http_client(), tarball_bytes, &krate, vers)?;

let (features, features2): (BTreeMap<_, _>, BTreeMap<_, _>) =
features.into_iter().partition(|(_k, vals)| {
Expand Down Expand Up @@ -300,44 +306,36 @@ fn count_versions_published_today(krate_id: i32, conn: &PgConnection) -> QueryRe
.get_result(conn)
}

/// Used by the `krate::new` function.
///
/// This function parses the JSON headers to interpret the data and validates
/// the data during and after the parsing. Returns crate metadata.
fn parse_new_headers<B: Read>(req: &mut Request<B>) -> AppResult<EncodableCrateUpload> {
// Read the json upload request
let metadata_length = u64::from(read_le_u32(req.body_mut())?);
req.add_custom_metadata("metadata_length", metadata_length);

let max = req.app().config.max_upload_size;
if metadata_length > max {
return Err(cargo_err(&format_args!("max upload size is: {max}")));
}
let mut json = vec![0; metadata_length as usize];
read_fill(req.body_mut(), &mut json)?;
let new: EncodableCrateUpload = serde_json::from_slice(&json)
.map_err(|e| cargo_err(&format_args!("invalid upload request: {e}")))?;

// Make sure required fields are provided
fn empty(s: Option<&String>) -> bool {
s.map_or(true, String::is_empty)
#[instrument(skip_all)]
fn split_body<R: RequestPartsExt>(mut bytes: Bytes, req: &R) -> AppResult<(Bytes, Bytes)> {
// The format of the req.body() of a publish request is as follows:
//
// metadata length
// metadata in JSON about the crate being published
// .crate tarball length
// .crate tarball file

let json_len = bytes.get_u32_le() as usize;
req.add_custom_metadata("metadata_length", json_len);

if json_len > bytes.len() {
return Err(cargo_err(&format!(
"invalid metadata length for remaining payload: {json_len}"
)));
}

// It can have up to three elements per below conditions.
let mut missing = Vec::with_capacity(3);
let json_bytes = bytes.split_to(json_len);

if empty(new.description.as_ref()) {
missing.push("description");
}
if empty(new.license.as_ref()) && empty(new.license_file.as_ref()) {
missing.push("license");
}
if !missing.is_empty() {
let message = missing_metadata_error_message(&missing);
return Err(cargo_err(&message));
let tarball_len = bytes.get_u32_le() as usize;
if tarball_len > bytes.len() {
return Err(cargo_err(&format!(
"invalid metadata length for remaining payload: {tarball_len}"
)));
}

Ok(new)
let tarball_bytes = bytes.split_to(tarball_len);

Ok((json_bytes, tarball_bytes))
}

pub fn missing_metadata_error_message(missing: &[&str]) -> String {
Expand Down
34 changes: 12 additions & 22 deletions src/uploaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use reqwest::{blocking::Client, header};

use crate::util::errors::{internal, AppResult};

use reqwest::blocking::Body;
use std::env;
use std::fs::{self, File};
use std::io::{Cursor, SeekFrom};
use std::path::PathBuf;

use crate::models::Crate;
Expand Down Expand Up @@ -109,11 +109,11 @@ impl Uploader {
///
/// This function can panic on an `Self::Local` during development.
/// Production and tests use `Self::S3` which should not panic.
pub fn upload<R: std::io::Read + std::io::Seek + Send + 'static>(
pub fn upload<R: Into<Body>>(
&self,
client: &Client,
path: &str,
mut content: R,
content: R,
content_type: &str,
extra_headers: header::HeaderMap,
upload_bucket: UploadBucket,
Expand All @@ -130,16 +130,7 @@ impl Uploader {
};

if let Some(bucket) = bucket {
let content_length = content.seek(SeekFrom::End(0))?;
content.rewind()?;
bucket.put(
client,
path,
content,
content_length,
content_type,
extra_headers,
)?;
bucket.put(client, path, content, content_type, extra_headers)?;
}

Ok(Some(String::from(path)))
Expand All @@ -149,7 +140,9 @@ impl Uploader {
let dir = filename.parent().unwrap();
fs::create_dir_all(dir)?;
let mut file = File::create(&filename)?;
std::io::copy(&mut content, &mut file)?;
let mut body = content.into();
let mut buffer = body.buffer()?;
std::io::copy(&mut buffer, &mut file)?;
Ok(filename.to_str().map(String::from))
}
}
Expand Down Expand Up @@ -183,15 +176,14 @@ impl Uploader {
}

/// Uploads a crate and returns the checksum of the uploaded crate file.
pub fn upload_crate(
pub fn upload_crate<R: Into<Body>>(
&self,
http_client: &Client,
body: Vec<u8>,
body: R,
krate: &Crate,
vers: &semver::Version,
) -> AppResult<()> {
let path = Uploader::crate_path(&krate.name, &vers.to_string());
let content = Cursor::new(body);
let mut extra_headers = header::HeaderMap::new();
extra_headers.insert(
header::CACHE_CONTROL,
Expand All @@ -200,7 +192,7 @@ impl Uploader {
self.upload(
http_client,
&path,
content,
body,
"application/gzip",
extra_headers,
UploadBucket::Default,
Expand All @@ -217,7 +209,6 @@ impl Uploader {
readme: String,
) -> Result<()> {
let path = Uploader::readme_path(crate_name, vers);
let content = Cursor::new(readme);
let mut extra_headers = header::HeaderMap::new();
extra_headers.insert(
header::CACHE_CONTROL,
Expand All @@ -226,7 +217,7 @@ impl Uploader {
self.upload(
http_client,
&path,
content,
readme,
"text/html",
extra_headers,
UploadBucket::Default,
Expand All @@ -241,7 +232,6 @@ impl Uploader {
index: String,
) -> Result<()> {
let path = Uploader::index_path(crate_name);
let content = Cursor::new(index);
let mut extra_headers = header::HeaderMap::new();
extra_headers.insert(
header::CACHE_CONTROL,
Expand All @@ -250,7 +240,7 @@ impl Uploader {
self.upload(
http_client,
&path,
content,
index,
"text/plain",
extra_headers,
UploadBucket::Index,
Expand Down