diff --git a/config/nginx.conf.erb b/config/nginx.conf.erb index e326382787b..f5d8158704b 100644 --- a/config/nginx.conf.erb +++ b/config/nginx.conf.erb @@ -176,8 +176,6 @@ http { client_body_timeout 30; client_max_body_size 50m; - limit_req_zone $remote_addr zone=publish:10m rate=1r/m; - upstream app_server { server localhost:8888 fail_timeout=0; } @@ -261,12 +259,5 @@ http { proxy_pass http://app_server; } <% end %> - - location ~ ^/api/v./crates/new$ { - proxy_pass http://app_server; - - limit_req zone=publish burst=30 nodelay; - limit_req_status 429; - } } } diff --git a/migrations/2021-07-18-125813_add_rate_limit_action/down.sql b/migrations/2021-07-18-125813_add_rate_limit_action/down.sql new file mode 100644 index 00000000000..ede207aae89 --- /dev/null +++ b/migrations/2021-07-18-125813_add_rate_limit_action/down.sql @@ -0,0 +1,9 @@ +DELETE FROM publish_limit_buckets WHERE action != 0; +ALTER TABLE publish_limit_buckets DROP CONSTRAINT publish_limit_buckets_pkey; +ALTER TABLE publish_limit_buckets ADD CONSTRAINT publish_limit_buckets_pkey PRIMARY KEY (user_id); +ALTER TABLE publish_limit_buckets DROP COLUMN action; + +DELETE FROM publish_rate_overrides WHERE action != 0; +ALTER TABLE publish_rate_overrides DROP CONSTRAINT publish_rate_overrides_pkey; +ALTER TABLE publish_rate_overrides ADD CONSTRAINT publish_rate_overrides_pkey PRIMARY KEY (user_id); +ALTER TABLE publish_rate_overrides DROP COLUMN action; diff --git a/migrations/2021-07-18-125813_add_rate_limit_action/up.sql b/migrations/2021-07-18-125813_add_rate_limit_action/up.sql new file mode 100644 index 00000000000..312635429d4 --- /dev/null +++ b/migrations/2021-07-18-125813_add_rate_limit_action/up.sql @@ -0,0 +1,7 @@ +ALTER TABLE publish_limit_buckets ADD COLUMN action INTEGER NOT NULL DEFAULT 0; +ALTER TABLE publish_limit_buckets DROP CONSTRAINT publish_limit_buckets_pkey; +ALTER TABLE publish_limit_buckets ADD CONSTRAINT publish_limit_buckets_pkey PRIMARY KEY (user_id, action); + +ALTER TABLE publish_rate_overrides ADD COLUMN action INTEGER NOT NULL DEFAULT 0; +ALTER TABLE publish_rate_overrides DROP CONSTRAINT publish_rate_overrides_pkey; +ALTER TABLE publish_rate_overrides ADD CONSTRAINT publish_rate_overrides_pkey PRIMARY KEY (user_id, action); diff --git a/src/app.rs b/src/app.rs index a4e47ff78eb..e164a373386 100644 --- a/src/app.rs +++ b/src/app.rs @@ -8,6 +8,7 @@ use crate::downloads_counter::DownloadsCounter; use crate::email::Emails; use crate::github::GitHubClient; use crate::metrics::{InstanceMetrics, ServiceMetrics}; +use crate::rate_limiter::RateLimiter; use diesel::r2d2; use oauth2::basic::BasicClient; use reqwest::blocking::Client; @@ -43,6 +44,8 @@ pub struct App { /// Metrics related to this specific instance of the service pub instance_metrics: InstanceMetrics, + pub rate_limiter: RateLimiter, + /// A configured client for outgoing HTTP requests /// /// In production this shares a single connection pool across requests. In tests @@ -165,6 +168,7 @@ impl App { read_only_replica_database: replica_database, github, github_oauth, + rate_limiter: RateLimiter::new(config.rate_limiter.clone()), config, downloads_counter: DownloadsCounter::new(), emails: Emails::from_environment(), diff --git a/src/config.rs b/src/config.rs index 2566576c7d7..9c4aa3ba125 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,7 @@ -use crate::publish_rate_limit::PublishRateLimit; +use crate::rate_limiter::{LimitedAction, RateLimiterConfig}; use crate::{env, env_optional, uploaders::Uploader, Env}; +use std::collections::HashMap; +use std::time::Duration; mod base; mod database_pools; @@ -16,7 +18,6 @@ pub struct Server { pub gh_base_url: String, pub max_upload_size: u64, pub max_unpack_size: u64, - pub publish_rate_limit: PublishRateLimit, pub blocked_traffic: Vec<(String, Vec)>, pub max_allowed_page_offset: u32, pub page_offset_ua_blocklist: Vec, @@ -27,6 +28,7 @@ pub struct Server { pub metrics_authorization_token: Option, pub use_test_database_pool: bool, pub instance_metrics_log_every_seconds: Option, + pub rate_limiter: HashMap, } impl Default for Server { @@ -64,12 +66,34 @@ impl Default for Server { .split(',') .map(ToString::to_string) .collect(); + let page_offset_ua_blocklist = match env_optional::("WEB_PAGE_OFFSET_UA_BLOCKLIST") { None => vec![], Some(s) if s.is_empty() => vec![], Some(s) => s.split(',').map(String::from).collect(), }; + + // Dynamically load the configuration for all the rate limiting actions. See + // `src/rate_limiter.rs` for their definition. + let mut rate_limiter = HashMap::new(); + for action in LimitedAction::VARIANTS { + rate_limiter.insert( + *action, + RateLimiterConfig { + rate: Duration::from_secs( + env_optional(&format!( + "RATE_LIMITER_{}_RATE_SECONDS", + action.env_var_key() + )) + .unwrap_or_else(|| action.default_rate_seconds()), + ), + burst: env_optional(&format!("RATE_LIMITER_{}_BURST", action.env_var_key())) + .unwrap_or_else(|| action.default_burst()), + }, + ); + } + Server { db: DatabasePools::full_from_environment(), base: Base::from_environment(), @@ -79,7 +103,6 @@ impl Default for Server { gh_base_url: "https://api.github.com".to_string(), max_upload_size: 10 * 1024 * 1024, // 10 MB default file upload size limit max_unpack_size: 512 * 1024 * 1024, // 512 MB max when decompressed - publish_rate_limit: Default::default(), blocked_traffic: blocked_traffic(), max_allowed_page_offset: env_optional("WEB_MAX_ALLOWED_PAGE_OFFSET").unwrap_or(200), page_offset_ua_blocklist, @@ -96,6 +119,7 @@ impl Default for Server { metrics_authorization_token: dotenv::var("METRICS_AUTHORIZATION_TOKEN").ok(), use_test_database_pool: false, instance_metrics_log_every_seconds: env_optional("INSTANCE_METRICS_LOG_EVERY_SECONDS"), + rate_limiter, } } } diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 81acef4d65a..8325ba1bf14 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -11,6 +11,7 @@ use crate::models::{ NewVersion, Rights, VersionAction, }; +use crate::rate_limiter::LimitedAction; use crate::render; use crate::schema::*; use crate::util::errors::{cargo_err, AppResult}; @@ -72,6 +73,16 @@ pub fn publish(req: &mut dyn RequestExt) -> EndpointResult { )) })?; + // Check the rate limits on the endpoint. Publishing a new crate uses a different (stricter) + // limiting pool than publishing a new version of an existing crate to prevent mass squatting. + let limited_action = if Crate::by_name(&new_crate.name).execute(&*conn)? == 0 { + LimitedAction::PublishNew + } else { + LimitedAction::PublishExisting + }; + app.rate_limiter + .check_rate_limit(user.id, limited_action, &conn)?; + // Create a transaction on the database, if there are no errors, // commit the transactions to record a new or updated crate. conn.transaction(|| { @@ -107,8 +118,7 @@ pub fn publish(req: &mut dyn RequestExt) -> EndpointResult { }; let license_file = new_crate.license_file.as_deref(); - let krate = - persist.create_or_update(&conn, user.id, Some(&app.config.publish_rate_limit))?; + let krate = persist.create_or_update(&conn, user.id)?; let owners = krate.owners(&conn)?; if user.rights(req.app(), &owners)? < Rights::Publish { diff --git a/src/downloads_counter.rs b/src/downloads_counter.rs index 9ff203ad9bb..e4168069cdf 100644 --- a/src/downloads_counter.rs +++ b/src/downloads_counter.rs @@ -438,7 +438,7 @@ mod tests { name: "foo", ..NewCrate::default() } - .create_or_update(conn, user.id, None) + .create_or_update(conn, user.id) .expect("failed to create crate"); Self { diff --git a/src/lib.rs b/src/lib.rs index 40168c77785..51a785d3a2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,12 +44,13 @@ pub mod git; pub mod github; pub mod metrics; pub mod middleware; -mod publish_rate_limit; +pub mod rate_limiter; pub mod render; pub mod schema; pub mod tasks; mod test_util; pub mod uploaders; +#[macro_use] pub mod util; pub mod controllers; diff --git a/src/models/action.rs b/src/models/action.rs index 266e91c681e..7c1c7d631e3 100644 --- a/src/models/action.rs +++ b/src/models/action.rs @@ -1,23 +1,14 @@ -use chrono::NaiveDateTime; -use diesel::prelude::*; -use diesel::{ - deserialize::{self, FromSql}, - pg::Pg, - serialize::{self, Output, ToSql}, - sql_types::Integer, -}; -use std::io::Write; - use crate::models::{ApiToken, User, Version}; use crate::schema::*; +use chrono::NaiveDateTime; +use diesel::prelude::*; -#[derive(Debug, Clone, Copy, PartialEq, FromSqlRow, AsExpression)] -#[repr(i32)] -#[sql_type = "Integer"] -pub enum VersionAction { - Publish = 0, - Yank = 1, - Unyank = 2, +pg_enum! { + pub enum VersionAction { + Publish = 0, + Yank = 1, + Unyank = 2, + } } impl From for &'static str { @@ -38,23 +29,6 @@ impl From for String { } } -impl FromSql for VersionAction { - fn from_sql(bytes: Option<&[u8]>) -> deserialize::Result { - match >::from_sql(bytes)? { - 0 => Ok(VersionAction::Publish), - 1 => Ok(VersionAction::Yank), - 2 => Ok(VersionAction::Unyank), - n => Err(format!("unknown version action: {}", n).into()), - } - } -} - -impl ToSql for VersionAction { - fn to_sql(&self, out: &mut Output<'_, W, Pg>) -> serialize::Result { - ToSql::::to_sql(&(*self as i32), out) - } -} - #[derive(Debug, Clone, Copy, Queryable, Identifiable, Associations)] #[belongs_to(Version)] #[belongs_to(User, foreign_key = "user_id")] diff --git a/src/models/dependency.rs b/src/models/dependency.rs index 0548f4386db..3191b24dbf3 100644 --- a/src/models/dependency.rs +++ b/src/models/dependency.rs @@ -1,7 +1,3 @@ -use diesel::deserialize::{self, FromSql}; -use diesel::pg::Pg; -use diesel::sql_types::Integer; - use crate::models::{Crate, Version}; use crate::schema::*; @@ -32,23 +28,10 @@ pub struct ReverseDependency { pub name: String, } -#[derive(Copy, Clone, Serialize, Deserialize, Debug, FromSqlRow)] -#[serde(rename_all = "lowercase")] -#[repr(u32)] -pub enum DependencyKind { - Normal = 0, - Build = 1, - Dev = 2, - // if you add a kind here, be sure to update `from_row` below. -} - -impl FromSql for DependencyKind { - fn from_sql(bytes: Option<&[u8]>) -> deserialize::Result { - match >::from_sql(bytes)? { - 0 => Ok(DependencyKind::Normal), - 1 => Ok(DependencyKind::Build), - 2 => Ok(DependencyKind::Dev), - n => Err(format!("unknown kind: {}", n).into()), - } +pg_enum! { + pub enum DependencyKind { + Normal = 0, + Build = 1, + Dev = 2, } } diff --git a/src/models/krate.rs b/src/models/krate.rs index 624c4ed875d..fff4868459c 100644 --- a/src/models/krate.rs +++ b/src/models/krate.rs @@ -15,7 +15,6 @@ use crate::models::{ use crate::util::errors::{cargo_err, AppResult}; use crate::models::helpers::with_count::*; -use crate::publish_rate_limit::PublishRateLimit; use crate::schema::*; #[derive(Debug, Queryable, Identifiable, Associations, Clone, Copy)] @@ -93,12 +92,7 @@ pub struct NewCrate<'a> { } impl<'a> NewCrate<'a> { - pub fn create_or_update( - self, - conn: &PgConnection, - uploader: i32, - rate_limit: Option<&PublishRateLimit>, - ) -> AppResult { + pub fn create_or_update(self, conn: &PgConnection, uploader: i32) -> AppResult { use diesel::update; self.validate()?; @@ -108,9 +102,6 @@ impl<'a> NewCrate<'a> { // To avoid race conditions, we try to insert // first so we know whether to add an owner if let Some(krate) = self.save_new_crate(conn, uploader)? { - if let Some(rate_limit) = rate_limit { - rate_limit.check_rate_limit(uploader, conn)?; - } return Ok(krate); } diff --git a/src/publish_rate_limit.rs b/src/publish_rate_limit.rs deleted file mode 100644 index 643be617ede..00000000000 --- a/src/publish_rate_limit.rs +++ /dev/null @@ -1,415 +0,0 @@ -use chrono::{NaiveDateTime, Utc}; -use diesel::data_types::PgInterval; -use diesel::prelude::*; -use std::time::Duration; - -use crate::schema::{publish_limit_buckets, publish_rate_overrides}; -use crate::util::errors::{AppResult, TooManyRequests}; - -#[derive(Debug, Clone, Copy)] -pub struct PublishRateLimit { - pub rate: Duration, - pub burst: i32, -} - -impl Default for PublishRateLimit { - fn default() -> Self { - let minutes = dotenv::var("WEB_NEW_PKG_RATE_LIMIT_RATE_MINUTES") - .unwrap_or_default() - .parse() - .ok() - .unwrap_or(10); - let burst = dotenv::var("WEB_NEW_PKG_RATE_LIMIT_BURST") - .unwrap_or_default() - .parse() - .ok() - .unwrap_or(5); - Self { - rate: Duration::from_secs(60) * minutes, - burst, - } - } -} - -#[derive(Queryable, Insertable, Debug, PartialEq, Clone, Copy)] -#[table_name = "publish_limit_buckets"] -#[allow(dead_code)] // Most fields only read in tests -struct Bucket { - user_id: i32, - tokens: i32, - last_refill: NaiveDateTime, -} - -impl PublishRateLimit { - pub fn check_rate_limit(&self, uploader: i32, conn: &PgConnection) -> AppResult<()> { - let bucket = self.take_token(uploader, Utc::now().naive_utc(), conn)?; - if bucket.tokens >= 1 { - Ok(()) - } else { - Err(Box::new(TooManyRequests { - retry_after: bucket.last_refill + chrono::Duration::from_std(self.rate).unwrap(), - })) - } - } - - /// Refill a user's bucket as needed, take a token from it, - /// and returns the result. - /// - /// The number of tokens remaining will always be between 0 and self.burst. - /// If the number is 0, the request should be rejected, as the user doesn't - /// have a token to take. Technically a "full" bucket would have - /// `self.burst + 1` tokens in it, but that value would never be returned - /// since we only refill buckets when trying to take a token from it. - fn take_token( - &self, - uploader: i32, - now: NaiveDateTime, - conn: &PgConnection, - ) -> QueryResult { - use self::publish_limit_buckets::dsl::*; - use diesel::sql_types::{Double, Interval, Text, Timestamp}; - - sql_function!(fn date_part(x: Text, y: Timestamp) -> Double); - sql_function! { - #[sql_name = "date_part"] - fn interval_part(x: Text, y: Interval) -> Double; - } - sql_function!(fn floor(x: Double) -> Integer); - sql_function!(fn greatest(x: T, y: T) -> T); - sql_function!(fn least(x: T, y: T) -> T); - - let burst: i32 = publish_rate_overrides::table - .find(uploader) - .filter( - publish_rate_overrides::expires_at - .is_null() - .or(publish_rate_overrides::expires_at.gt(now)), - ) - .select(publish_rate_overrides::burst) - .first(conn) - .optional()? - .unwrap_or(self.burst); - - // Interval division is poorly defined in general (what is 1 month / 30 days?) - // However, for the intervals we're dealing with, it is always well - // defined, so we convert to an f64 of seconds to represent this. - let tokens_to_add = floor( - (date_part("epoch", now) - date_part("epoch", last_refill)) - / interval_part("epoch", self.refill_rate()), - ); - - diesel::insert_into(publish_limit_buckets) - .values((user_id.eq(uploader), tokens.eq(burst), last_refill.eq(now))) - .on_conflict(user_id) - .do_update() - .set(( - tokens.eq(least(burst, greatest(0, tokens - 1) + tokens_to_add)), - last_refill - .eq(last_refill + self.refill_rate().into_sql::() * tokens_to_add), - )) - .get_result(conn) - } - - fn refill_rate(&self) -> PgInterval { - use diesel::dsl::*; - (self.rate.as_millis() as i64).milliseconds() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::email::Emails; - use crate::test_util::*; - - #[test] - fn take_token_with_no_bucket_creates_new_one() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let bucket = rate.take_token(new_user(&conn, "user1")?, now, &conn)?; - let expected = Bucket { - user_id: bucket.user_id, - tokens: 10, - last_refill: now, - }; - assert_eq!(expected, bucket); - - let rate = PublishRateLimit { - rate: Duration::from_millis(50), - burst: 20, - }; - let bucket = rate.take_token(new_user(&conn, "user2")?, now, &conn)?; - let expected = Bucket { - user_id: bucket.user_id, - tokens: 20, - last_refill: now, - }; - assert_eq!(expected, bucket); - Ok(()) - } - - #[test] - fn take_token_with_existing_bucket_modifies_existing_bucket() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let user_id = new_user_bucket(&conn, 5, now)?.user_id; - let bucket = rate.take_token(user_id, now, &conn)?; - let expected = Bucket { - user_id, - tokens: 4, - last_refill: now, - }; - assert_eq!(expected, bucket); - Ok(()) - } - - #[test] - fn take_token_after_delay_refills() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let user_id = new_user_bucket(&conn, 5, now)?.user_id; - let refill_time = now + chrono::Duration::seconds(2); - let bucket = rate.take_token(user_id, refill_time, &conn)?; - let expected = Bucket { - user_id, - tokens: 6, - last_refill: refill_time, - }; - assert_eq!(expected, bucket); - Ok(()) - } - - #[test] - fn refill_subsecond_rate() -> QueryResult<()> { - let conn = pg_connection(); - // Subsecond rates have floating point rounding issues, so use a known - // timestamp that rounds fine - let now = - NaiveDateTime::parse_from_str("2019-03-19T21:11:24.620401", "%Y-%m-%dT%H:%M:%S%.f") - .unwrap(); - - let rate = PublishRateLimit { - rate: Duration::from_millis(100), - burst: 10, - }; - let user_id = new_user_bucket(&conn, 5, now)?.user_id; - let refill_time = now + chrono::Duration::milliseconds(300); - let bucket = rate.take_token(user_id, refill_time, &conn)?; - let expected = Bucket { - user_id, - tokens: 7, - last_refill: refill_time, - }; - assert_eq!(expected, bucket); - Ok(()) - } - - #[test] - fn last_refill_always_advanced_by_multiple_of_rate() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_millis(100), - burst: 10, - }; - let user_id = new_user_bucket(&conn, 5, now)?.user_id; - let bucket = rate.take_token(user_id, now + chrono::Duration::milliseconds(250), &conn)?; - let expected_refill_time = now + chrono::Duration::milliseconds(200); - let expected = Bucket { - user_id, - tokens: 6, - last_refill: expected_refill_time, - }; - assert_eq!(expected, bucket); - Ok(()) - } - - #[test] - fn zero_tokens_returned_when_user_has_no_tokens_left() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let user_id = new_user_bucket(&conn, 1, now)?.user_id; - let bucket = rate.take_token(user_id, now, &conn)?; - let expected = Bucket { - user_id, - tokens: 0, - last_refill: now, - }; - assert_eq!(expected, bucket); - - let bucket = rate.take_token(user_id, now, &conn)?; - assert_eq!(expected, bucket); - Ok(()) - } - - #[test] - fn a_user_with_no_tokens_gets_a_token_after_exactly_rate() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let user_id = new_user_bucket(&conn, 0, now)?.user_id; - let refill_time = now + chrono::Duration::seconds(1); - let bucket = rate.take_token(user_id, refill_time, &conn)?; - let expected = Bucket { - user_id, - tokens: 1, - last_refill: refill_time, - }; - assert_eq!(expected, bucket); - - Ok(()) - } - - #[test] - fn tokens_never_refill_past_burst() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let user_id = new_user_bucket(&conn, 8, now)?.user_id; - let refill_time = now + chrono::Duration::seconds(4); - let bucket = rate.take_token(user_id, refill_time, &conn)?; - let expected = Bucket { - user_id, - tokens: 10, - last_refill: refill_time, - }; - assert_eq!(expected, bucket); - - Ok(()) - } - - #[test] - fn override_is_used_instead_of_global_burst_if_present() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let user_id = new_user(&conn, "user1")?; - let other_user_id = new_user(&conn, "user2")?; - - diesel::insert_into(publish_rate_overrides::table) - .values(( - publish_rate_overrides::user_id.eq(user_id), - publish_rate_overrides::burst.eq(20), - )) - .execute(&conn)?; - - let bucket = rate.take_token(user_id, now, &conn)?; - let other_bucket = rate.take_token(other_user_id, now, &conn)?; - - assert_eq!(20, bucket.tokens); - assert_eq!(10, other_bucket.tokens); - Ok(()) - } - - #[test] - fn overrides_can_expire() -> QueryResult<()> { - let conn = pg_connection(); - let now = now(); - - let rate = PublishRateLimit { - rate: Duration::from_secs(1), - burst: 10, - }; - let user_id = new_user(&conn, "user1")?; - let other_user_id = new_user(&conn, "user2")?; - - diesel::insert_into(publish_rate_overrides::table) - .values(( - publish_rate_overrides::user_id.eq(user_id), - publish_rate_overrides::burst.eq(20), - publish_rate_overrides::expires_at.eq(now + chrono::Duration::days(30)), - )) - .execute(&conn)?; - - let bucket = rate.take_token(user_id, now, &conn)?; - let other_bucket = rate.take_token(other_user_id, now, &conn)?; - - assert_eq!(20, bucket.tokens); - assert_eq!(10, other_bucket.tokens); - - // Manually expire the rate limit - diesel::update(publish_rate_overrides::table) - .set(publish_rate_overrides::expires_at.eq(now - chrono::Duration::days(30))) - .filter(publish_rate_overrides::user_id.eq(user_id)) - .execute(&conn)?; - - let bucket = rate.take_token(user_id, now, &conn)?; - let other_bucket = rate.take_token(other_user_id, now, &conn)?; - - // The number of tokens of user_id is 10 and not 9 because when the new burst limit is - // lower than the amount of available tokens, the number of available tokens is reset to - // the new burst limit. - assert_eq!(10, bucket.tokens); - assert_eq!(9, other_bucket.tokens); - - Ok(()) - } - - fn new_user(conn: &PgConnection, gh_login: &str) -> QueryResult { - use crate::models::NewUser; - - let user = NewUser { - gh_login, - ..NewUser::default() - } - .create_or_update(None, &Emails::new_in_memory(), conn)?; - Ok(user.id) - } - - fn new_user_bucket( - conn: &PgConnection, - tokens: i32, - now: NaiveDateTime, - ) -> QueryResult { - diesel::insert_into(publish_limit_buckets::table) - .values(Bucket { - user_id: new_user(conn, "new_user")?, - tokens, - last_refill: now, - }) - .get_result(conn) - } - - /// Strips ns precision from `Utc::now`. PostgreSQL only has microsecond - /// precision, but some platforms (notably Linux) provide nanosecond - /// precision, meaning that round tripping through the database would - /// change the value. - fn now() -> NaiveDateTime { - let now = Utc::now().naive_utc(); - let nanos = now.timestamp_subsec_nanos(); - now - chrono::Duration::nanoseconds(nanos.into()) - } -} diff --git a/src/rate_limiter.rs b/src/rate_limiter.rs new file mode 100644 index 00000000000..70353fa0969 --- /dev/null +++ b/src/rate_limiter.rs @@ -0,0 +1,559 @@ +use crate::schema::{publish_limit_buckets, publish_rate_overrides}; +use crate::util::errors::{AppResult, TooManyRequests}; +use chrono::{NaiveDateTime, Utc}; +use diesel::data_types::PgInterval; +use diesel::prelude::*; +use std::collections::HashMap; +use std::time::Duration; + +crate::pg_enum! { + pub enum LimitedAction { + PublishNew = 0, + PublishExisting = 1, + } +} + +impl LimitedAction { + /// How many seconds should elapse between requests (after the burst is exhausted). + pub fn default_rate_seconds(&self) -> u64 { + match self { + LimitedAction::PublishNew => 60 * 10, + LimitedAction::PublishExisting => 60, + } + } + + /// How many requests a user can make before the rate limit goes into effect. + pub fn default_burst(&self) -> i32 { + match self { + LimitedAction::PublishNew => 5, + LimitedAction::PublishExisting => 30, + } + } + + pub fn error_message(&self) -> &'static str { + match self { + LimitedAction::PublishNew => { + "You have published too many new crates in a short period of time." + } + LimitedAction::PublishExisting => { + "You have published too many versions of existing crates in a short period of time." + } + } + } + + /// Key used to identify this action in environment variables. See `src/config.rs`. + pub fn env_var_key(&self) -> &'static str { + match self { + LimitedAction::PublishNew => "PUBLISH_NEW", + LimitedAction::PublishExisting => "PUBLISH_EXISTING", + } + } +} + +#[derive(Debug, Copy, Clone)] +pub struct RateLimiterConfig { + pub rate: Duration, + pub burst: i32, +} + +#[derive(Debug)] +pub struct RateLimiter { + config: HashMap, +} + +impl RateLimiter { + pub fn new(config: HashMap) -> Self { + Self { config } + } + + pub fn check_rate_limit( + &self, + user_id: i32, + action: LimitedAction, + conn: &PgConnection, + ) -> AppResult<()> { + let bucket = self.take_token(user_id, action, Utc::now().naive_utc(), conn)?; + if bucket.tokens >= 1 { + Ok(()) + } else { + Err(Box::new(TooManyRequests { + action, + retry_after: bucket.last_refill + + chrono::Duration::from_std(self.config[&action].rate).unwrap(), + })) + } + } + + /// Refill a user's bucket as needed, take a token from it, + /// and returns the result. + /// + /// The number of tokens remaining will always be between 0 and self.burst. + /// If the number is 0, the request should be rejected, as the user doesn't + /// have a token to take. Technically a "full" bucket would have + /// `self.burst + 1` tokens in it, but that value would never be returned + /// since we only refill buckets when trying to take a token from it. + fn take_token( + &self, + user_id: i32, + action: LimitedAction, + now: NaiveDateTime, + conn: &PgConnection, + ) -> QueryResult { + use diesel::sql_types::{Double, Interval, Text, Timestamp}; + + sql_function!(fn date_part(x: Text, y: Timestamp) -> Double); + sql_function! { + #[sql_name = "date_part"] + fn interval_part(x: Text, y: Interval) -> Double; + } + sql_function!(fn floor(x: Double) -> Integer); + sql_function!(fn greatest(x: T, y: T) -> T); + sql_function!(fn least(x: T, y: T) -> T); + + let burst: i32 = publish_rate_overrides::table + .find((user_id, action)) + .filter( + publish_rate_overrides::expires_at + .is_null() + .or(publish_rate_overrides::expires_at.gt(now)), + ) + .select(publish_rate_overrides::burst) + .first(conn) + .optional()? + .unwrap_or(self.config[&action].burst); + + // Interval division is poorly defined in general (what is 1 month / 30 days?) + // However, for the intervals we're dealing with, it is always well + // defined, so we convert to an f64 of seconds to represent this. + let tokens_to_add = floor( + (date_part("epoch", now) - date_part("epoch", publish_limit_buckets::last_refill)) + / interval_part("epoch", self.refill_rate(action)), + ); + + diesel::insert_into(publish_limit_buckets::table) + .values(( + publish_limit_buckets::user_id.eq(user_id), + publish_limit_buckets::action.eq(action), + publish_limit_buckets::tokens.eq(burst), + publish_limit_buckets::last_refill.eq(now), + )) + .on_conflict(( + publish_limit_buckets::user_id, + publish_limit_buckets::action, + )) + .do_update() + .set(( + publish_limit_buckets::tokens.eq(least( + burst, + greatest(0, publish_limit_buckets::tokens - 1) + tokens_to_add, + )), + publish_limit_buckets::last_refill.eq(publish_limit_buckets::last_refill + + self.refill_rate(action).into_sql::() * tokens_to_add), + )) + .get_result(conn) + } + + fn refill_rate(&self, action: LimitedAction) -> PgInterval { + use diesel::dsl::*; + (self.config[&action].rate.as_millis() as i64).milliseconds() + } +} + +#[derive(Queryable, Insertable, Debug, PartialEq, Clone, Copy)] +#[table_name = "publish_limit_buckets"] +#[allow(dead_code)] // Most fields only read in tests +struct Bucket { + user_id: i32, + tokens: i32, + last_refill: NaiveDateTime, + action: LimitedAction, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::email::Emails; + use crate::test_util::*; + + #[test] + fn take_token_with_no_bucket_creates_new_one() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let bucket = rate.take_token( + new_user(&conn, "user1")?, + LimitedAction::PublishNew, + now, + &conn, + )?; + let expected = Bucket { + user_id: bucket.user_id, + tokens: 10, + last_refill: now, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + + let rate = simple_limiter(LimitedAction::PublishNew, 50, 20); + let bucket = rate.take_token( + new_user(&conn, "user2")?, + LimitedAction::PublishNew, + now, + &conn, + )?; + let expected = Bucket { + user_id: bucket.user_id, + tokens: 20, + last_refill: now, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + Ok(()) + } + + #[test] + fn take_token_with_existing_bucket_modifies_existing_bucket() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let user_id = new_user_bucket(&conn, LimitedAction::PublishNew, 5, now)?.user_id; + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, now, &conn)?; + let expected = Bucket { + user_id, + tokens: 4, + last_refill: now, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + Ok(()) + } + + #[test] + fn take_token_after_delay_refills() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let user_id = new_user_bucket(&conn, LimitedAction::PublishNew, 5, now)?.user_id; + let refill_time = now + chrono::Duration::seconds(2); + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, refill_time, &conn)?; + let expected = Bucket { + user_id, + tokens: 6, + last_refill: refill_time, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + Ok(()) + } + + #[test] + fn refill_subsecond_rate() -> QueryResult<()> { + let conn = pg_connection(); + // Subsecond rates have floating point rounding issues, so use a known + // timestamp that rounds fine + let now = + NaiveDateTime::parse_from_str("2019-03-19T21:11:24.620401", "%Y-%m-%dT%H:%M:%S%.f") + .unwrap(); + + let rate = simple_limiter(LimitedAction::PublishNew, 100, 10); + let user_id = new_user_bucket(&conn, LimitedAction::PublishNew, 5, now)?.user_id; + let refill_time = now + chrono::Duration::milliseconds(300); + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, refill_time, &conn)?; + let expected = Bucket { + user_id, + tokens: 7, + last_refill: refill_time, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + Ok(()) + } + + #[test] + fn last_refill_always_advanced_by_multiple_of_rate() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 100, 10); + let user_id = new_user_bucket(&conn, LimitedAction::PublishNew, 5, now)?.user_id; + let bucket = rate.take_token( + user_id, + LimitedAction::PublishNew, + now + chrono::Duration::milliseconds(250), + &conn, + )?; + let expected_refill_time = now + chrono::Duration::milliseconds(200); + let expected = Bucket { + user_id, + tokens: 6, + last_refill: expected_refill_time, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + Ok(()) + } + + #[test] + fn zero_tokens_returned_when_user_has_no_tokens_left() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let user_id = new_user_bucket(&conn, LimitedAction::PublishNew, 1, now)?.user_id; + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, now, &conn)?; + let expected = Bucket { + user_id, + tokens: 0, + last_refill: now, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, now, &conn)?; + assert_eq!(expected, bucket); + Ok(()) + } + + #[test] + fn a_user_with_no_tokens_gets_a_token_after_exactly_rate() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let user_id = new_user_bucket(&conn, LimitedAction::PublishNew, 0, now)?.user_id; + let refill_time = now + chrono::Duration::seconds(1); + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, refill_time, &conn)?; + let expected = Bucket { + user_id, + tokens: 1, + last_refill: refill_time, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + + Ok(()) + } + + #[test] + fn tokens_never_refill_past_burst() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let user_id = new_user_bucket(&conn, LimitedAction::PublishNew, 8, now)?.user_id; + let refill_time = now + chrono::Duration::seconds(4); + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, refill_time, &conn)?; + let expected = Bucket { + user_id, + tokens: 10, + last_refill: refill_time, + action: LimitedAction::PublishNew, + }; + assert_eq!(expected, bucket); + + Ok(()) + } + + #[test] + fn two_actions_dont_interfere_with_each_other() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + let user_id = new_user(&conn, "user")?; + + let mut config = HashMap::new(); + config.insert( + LimitedAction::PublishNew, + RateLimiterConfig { + rate: Duration::from_secs(1), + burst: 10, + }, + ); + config.insert( + LimitedAction::PublishExisting, + RateLimiterConfig { + rate: Duration::from_secs(1), + burst: 20, + }, + ); + let rate = RateLimiter::new(config); + + let refill_time = now + chrono::Duration::seconds(4); + assert_eq!( + 10, + rate.take_token(user_id, LimitedAction::PublishNew, refill_time, &conn)? + .tokens, + ); + assert_eq!( + 9, + rate.take_token(user_id, LimitedAction::PublishNew, refill_time, &conn)? + .tokens, + ); + assert_eq!( + 20, + rate.take_token(user_id, LimitedAction::PublishExisting, refill_time, &conn)? + .tokens, + ); + + Ok(()) + } + + #[test] + fn override_is_used_instead_of_global_burst_if_present() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let user_id = new_user(&conn, "user1")?; + let other_user_id = new_user(&conn, "user2")?; + + diesel::insert_into(publish_rate_overrides::table) + .values(( + publish_rate_overrides::user_id.eq(user_id), + publish_rate_overrides::action.eq(LimitedAction::PublishNew), + publish_rate_overrides::burst.eq(20), + )) + .execute(&conn)?; + + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, now, &conn)?; + let other_bucket = rate.take_token(other_user_id, LimitedAction::PublishNew, now, &conn)?; + + assert_eq!(20, bucket.tokens); + assert_eq!(10, other_bucket.tokens); + Ok(()) + } + + #[test] + fn override_is_different_for_each_action() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + let user_id = new_user(&conn, "user")?; + + let mut config = HashMap::new(); + for action in &[LimitedAction::PublishNew, LimitedAction::PublishExisting] { + config.insert( + *action, + RateLimiterConfig { + rate: Duration::from_secs(1), + burst: 10, + }, + ); + } + let rate = RateLimiter::new(config); + + diesel::insert_into(publish_rate_overrides::table) + .values(( + publish_rate_overrides::user_id.eq(user_id), + publish_rate_overrides::action.eq(LimitedAction::PublishNew), + publish_rate_overrides::burst.eq(20), + )) + .execute(&conn)?; + + let refill_time = now + chrono::Duration::seconds(4); + assert_eq!( + 20, + rate.take_token(user_id, LimitedAction::PublishNew, refill_time, &conn)? + .tokens, + ); + assert_eq!( + 10, + rate.take_token(user_id, LimitedAction::PublishExisting, refill_time, &conn)? + .tokens, + ); + + Ok(()) + } + + #[test] + fn overrides_can_expire() -> QueryResult<()> { + let conn = pg_connection(); + let now = now(); + + let rate = simple_limiter(LimitedAction::PublishNew, 1000, 10); + let user_id = new_user(&conn, "user1")?; + let other_user_id = new_user(&conn, "user2")?; + + diesel::insert_into(publish_rate_overrides::table) + .values(( + publish_rate_overrides::user_id.eq(user_id), + publish_rate_overrides::action.eq(LimitedAction::PublishNew), + publish_rate_overrides::burst.eq(20), + publish_rate_overrides::expires_at.eq(now + chrono::Duration::days(30)), + )) + .execute(&conn)?; + + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, now, &conn)?; + let other_bucket = rate.take_token(other_user_id, LimitedAction::PublishNew, now, &conn)?; + + assert_eq!(20, bucket.tokens); + assert_eq!(10, other_bucket.tokens); + + // Manually expire the rate limit + diesel::update(publish_rate_overrides::table) + .set(publish_rate_overrides::expires_at.eq(now - chrono::Duration::days(30))) + .filter(publish_rate_overrides::user_id.eq(user_id)) + .execute(&conn)?; + + let bucket = rate.take_token(user_id, LimitedAction::PublishNew, now, &conn)?; + let other_bucket = rate.take_token(other_user_id, LimitedAction::PublishNew, now, &conn)?; + + // The number of tokens of user_id is 10 and not 9 because when the new burst limit is + // lower than the amount of available tokens, the number of available tokens is reset to + // the new burst limit. + assert_eq!(10, bucket.tokens); + assert_eq!(9, other_bucket.tokens); + + Ok(()) + } + + fn new_user(conn: &PgConnection, gh_login: &str) -> QueryResult { + use crate::models::NewUser; + + let user = NewUser { + gh_login, + ..NewUser::default() + } + .create_or_update(None, &Emails::new_in_memory(), conn)?; + Ok(user.id) + } + + fn new_user_bucket( + conn: &PgConnection, + action: LimitedAction, + tokens: i32, + now: NaiveDateTime, + ) -> QueryResult { + diesel::insert_into(publish_limit_buckets::table) + .values(Bucket { + user_id: new_user(conn, "new_user")?, + tokens, + last_refill: now, + action, + }) + .get_result(conn) + } + + fn simple_limiter(action: LimitedAction, rate_millis: u64, burst: i32) -> RateLimiter { + let mut config = HashMap::new(); + config.insert( + action, + RateLimiterConfig { + rate: Duration::from_millis(rate_millis), + burst, + }, + ); + RateLimiter::new(config) + } + + /// Strips ns precision from `Utc::now`. PostgreSQL only has microsecond + /// precision, but some platforms (notably Linux) provide nanosecond + /// precision, meaning that round tripping through the database would + /// change the value. + fn now() -> NaiveDateTime { + let now = Utc::now().naive_utc(); + let nanos = now.timestamp_subsec_nanos(); + now - chrono::Duration::nanoseconds(nanos.into()) + } +} diff --git a/src/schema.rs b/src/schema.rs index 1cb6ffada2d..c1a1d959253 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -605,7 +605,7 @@ table! { /// Representation of the `publish_limit_buckets` table. /// /// (Automatically generated by Diesel.) - publish_limit_buckets (user_id) { + publish_limit_buckets (user_id, action) { /// The `user_id` column of the `publish_limit_buckets` table. /// /// Its SQL type is `Int4`. @@ -624,6 +624,12 @@ table! { /// /// (Automatically generated by Diesel.) last_refill -> Timestamp, + /// The `action` column of the `publish_limit_buckets` table. + /// + /// Its SQL type is `Int4`. + /// + /// (Automatically generated by Diesel.) + action -> Int4, } } @@ -634,7 +640,7 @@ table! { /// Representation of the `publish_rate_overrides` table. /// /// (Automatically generated by Diesel.) - publish_rate_overrides (user_id) { + publish_rate_overrides (user_id, action) { /// The `user_id` column of the `publish_rate_overrides` table. /// /// Its SQL type is `Int4`. @@ -653,6 +659,30 @@ table! { /// /// (Automatically generated by Diesel.) expires_at -> Nullable, + /// The `action` column of the `publish_rate_overrides` table. + /// + /// Its SQL type is `Int4`. + /// + /// (Automatically generated by Diesel.) + action -> Int4, + } +} + +table! { + /// Representation of the `recent_crate_downloads` view. + /// + /// This data represents the downloads in the last 90 days. + /// This view does not contain realtime data. + /// It is refreshed by the `update-downloads` script. + recent_crate_downloads (crate_id) { + /// The `crate_id` column of the `recent_crate_downloads` view. + /// + /// Its SQL type is `Integer`. + crate_id -> Integer, + /// The `downloads` column of the `recent_crate_downloads` table. + /// + /// Its SQL type is `BigInt`. + downloads -> BigInt, } } @@ -679,24 +709,6 @@ table! { } } -table! { - /// Representation of the `recent_crate_downloads` view. - /// - /// This data represents the downloads in the last 90 days. - /// This view does not contain realtime data. - /// It is refreshed by the `update-downloads` script. - recent_crate_downloads (crate_id) { - /// The `crate_id` column of the `recent_crate_downloads` view. - /// - /// Its SQL type is `Integer`. - crate_id -> Integer, - /// The `downloads` column of the `recent_crate_downloads` table. - /// - /// Its SQL type is `BigInt`. - downloads -> BigInt, - } -} - table! { use diesel::sql_types::*; use diesel_full_text_search::{TsVector as Tsvector}; diff --git a/src/tasks/dump_db/dump-db.toml b/src/tasks/dump_db/dump-db.toml index 8d8b2dd9c23..1cee80f1734 100644 --- a/src/tasks/dump_db/dump-db.toml +++ b/src/tasks/dump_db/dump-db.toml @@ -138,11 +138,13 @@ total_downloads = "public" [publish_limit_buckets.columns] user_id = "private" +action = "private" tokens = "private" last_refill = "private" [publish_rate_overrides.columns] user_id = "private" +action = "private" burst = "private" expires_at = "private" diff --git a/src/tasks/update_downloads.rs b/src/tasks/update_downloads.rs index 925ce3c5502..252a7071125 100644 --- a/src/tasks/update_downloads.rs +++ b/src/tasks/update_downloads.rs @@ -98,7 +98,7 @@ mod test { name: "foo", ..Default::default() } - .create_or_update(conn, user_id, None) + .create_or_update(conn, user_id) .unwrap(); let version = NewVersion::new( krate.id, diff --git a/src/tests/builders/krate.rs b/src/tests/builders/krate.rs index 5669567587c..f1323fa8e02 100644 --- a/src/tests/builders/krate.rs +++ b/src/tests/builders/krate.rs @@ -114,9 +114,7 @@ impl<'a> CrateBuilder<'a> { pub fn build(mut self, connection: &PgConnection) -> AppResult { use diesel::{insert_into, select, update}; - let mut krate = self - .krate - .create_or_update(connection, self.owner_id, None)?; + let mut krate = self.krate.create_or_update(connection, self.owner_id)?; // Since we are using `NewCrate`, we can't set all the // crate properties in a single DB call. diff --git a/src/tests/http-data/krate_publish_publish_existing_crates_rate_limit b/src/tests/http-data/krate_publish_publish_existing_crates_rate_limit new file mode 100644 index 00000000000..20c7b357b17 --- /dev/null +++ b/src/tests/http-data/krate_publish_publish_existing_crates_rate_limit @@ -0,0 +1,204 @@ +[ + { + "request": { + "uri": "http://alexcrichton-test.s3.amazonaws.com/crates/rate_limited/rate_limited-1.0.0.crate", + "method": "PUT", + "headers": [ + [ + "content-length", + "35" + ], + [ + "content-type", + "application/x-tar" + ], + [ + "accept-encoding", + "gzip" + ], + [ + "authorization", + "AWS AKIAICL5IWUZYWWKA7JA:3P5wvArAHvV7o8atB0gUc0RTCCc=" + ], + [ + "accept", + "*/*" + ], + [ + "date", + "Fri, 15 Sep 2017 07:53:06 -0700" + ], + [ + "host", + "alexcrichton-test.s3.amazonaws.com" + ] + ], + "body": "H4sIAAAAAAAA/+3AAQEAAACCIP+vbkhQwKsBLq+17wAEAAA=" + }, + "response": { + "status": 200, + "headers": [ + [ + "x-amz-id-2", + "9ZLFTsEwh2iNu+BlWzaTZ85mFA7pxZgsGhQCj3Qi67LqT/iB5eiCOJQPYww2BEkoivJbr0mWruo=" + ], + [ + "date", + "Fri, 15 Sep 2017 14:53:07 GMT" + ], + [ + "content-length", + "0" + ], + [ + "x-amz-request-id", + "FD643F2FC49A7DF3" + ], + [ + "Server", + "AmazonS3" + ], + [ + "ETag", + "\"f9016ad360cebb4fe2e6e96e5949f022\"" + ] + ], + "body": "" + } + }, + { + "request": { + "uri": "http://alexcrichton-test.s3.amazonaws.com/crates/rate_limited/rate_limited-1.0.1.crate", + "method": "PUT", + "headers": [ + [ + "content-length", + "35" + ], + [ + "content-type", + "application/x-tar" + ], + [ + "accept-encoding", + "gzip" + ], + [ + "authorization", + "AWS AKIAICL5IWUZYWWKA7JA:3P5wvArAHvV7o8atB0gUc0RTCCc=" + ], + [ + "accept", + "*/*" + ], + [ + "date", + "Fri, 15 Sep 2017 07:53:06 -0700" + ], + [ + "host", + "alexcrichton-test.s3.amazonaws.com" + ] + ], + "body": "H4sIAAAAAAAA/+3AAQEAAACCIP+vbkhQwKsBLq+17wAEAAA=" + }, + "response": { + "status": 200, + "headers": [ + [ + "x-amz-id-2", + "9ZLFTsEwh2iNu+BlWzaTZ85mFA7pxZgsGhQCj3Qi67LqT/iB5eiCOJQPYww2BEkoivJbr0mWruo=" + ], + [ + "date", + "Fri, 15 Sep 2017 14:53:07 GMT" + ], + [ + "content-length", + "0" + ], + [ + "x-amz-request-id", + "FD643F2FC49A7DF3" + ], + [ + "Server", + "AmazonS3" + ], + [ + "ETag", + "\"f9016ad360cebb4fe2e6e96e5949f022\"" + ] + ], + "body": "" + } + }, + { + "request": { + "uri": + "http://alexcrichton-test.s3.amazonaws.com/crates/rate_limited/rate_limited-1.0.3.crate", + "method": "PUT", + "headers": [ + [ + "content-length", + "35" + ], + [ + "content-type", + "application/x-tar" + ], + [ + "accept-encoding", + "gzip" + ], + [ + "authorization", + "AWS AKIAICL5IWUZYWWKA7JA:3P5wvArAHvV7o8atB0gUc0RTCCc=" + ], + [ + "accept", + "*/*" + ], + [ + "date", + "Fri, 15 Sep 2017 07:53:06 -0700" + ], + [ + "host", + "alexcrichton-test.s3.amazonaws.com" + ] + ], + "body": "H4sIAAAAAAAA/+3AAQEAAACCIP+vbkhQwKsBLq+17wAEAAA=" + }, + "response": { + "status": 200, + "headers": [ + [ + "x-amz-id-2", + "9ZLFTsEwh2iNu+BlWzaTZ85mFA7pxZgsGhQCj3Qi67LqT/iB5eiCOJQPYww2BEkoivJbr0mWruo=" + ], + [ + "date", + "Fri, 15 Sep 2017 14:53:07 GMT" + ], + [ + "content-length", + "0" + ], + [ + "x-amz-request-id", + "FD643F2FC49A7DF3" + ], + [ + "Server", + "AmazonS3" + ], + [ + "ETag", + "\"f9016ad360cebb4fe2e6e96e5949f022\"" + ] + ], + "body": "" + } + } +] diff --git a/src/tests/http-data/krate_publish_publish_existing_crates_rate_limit_doesnt_affect_new_crates b/src/tests/http-data/krate_publish_publish_existing_crates_rate_limit_doesnt_affect_new_crates new file mode 100644 index 00000000000..81a309d6b33 --- /dev/null +++ b/src/tests/http-data/krate_publish_publish_existing_crates_rate_limit_doesnt_affect_new_crates @@ -0,0 +1,203 @@ +[ + { + "request": { + "uri": "http://alexcrichton-test.s3.amazonaws.com/crates/rate_limited1/rate_limited1-1.0.0.crate", + "method": "PUT", + "headers": [ + [ + "content-length", + "35" + ], + [ + "content-type", + "application/x-tar" + ], + [ + "accept-encoding", + "gzip" + ], + [ + "authorization", + "AWS AKIAICL5IWUZYWWKA7JA:3P5wvArAHvV7o8atB0gUc0RTCCc=" + ], + [ + "accept", + "*/*" + ], + [ + "date", + "Fri, 15 Sep 2017 07:53:06 -0700" + ], + [ + "host", + "alexcrichton-test.s3.amazonaws.com" + ] + ], + "body": "H4sIAAAAAAAA/+3AAQEAAACCIP+vbkhQwKsBLq+17wAEAAA=" + }, + "response": { + "status": 200, + "headers": [ + [ + "x-amz-id-2", + "9ZLFTsEwh2iNu+BlWzaTZ85mFA7pxZgsGhQCj3Qi67LqT/iB5eiCOJQPYww2BEkoivJbr0mWruo=" + ], + [ + "date", + "Fri, 15 Sep 2017 14:53:07 GMT" + ], + [ + "content-length", + "0" + ], + [ + "x-amz-request-id", + "FD643F2FC49A7DF3" + ], + [ + "Server", + "AmazonS3" + ], + [ + "ETag", + "\"f9016ad360cebb4fe2e6e96e5949f022\"" + ] + ], + "body": "" + } + }, + { + "request": { + "uri": "http://alexcrichton-test.s3.amazonaws.com/crates/rate_limited1/rate_limited1-1.0.1.crate", + "method": "PUT", + "headers": [ + [ + "content-length", + "35" + ], + [ + "content-type", + "application/x-tar" + ], + [ + "accept-encoding", + "gzip" + ], + [ + "authorization", + "AWS AKIAICL5IWUZYWWKA7JA:3P5wvArAHvV7o8atB0gUc0RTCCc=" + ], + [ + "accept", + "*/*" + ], + [ + "date", + "Fri, 15 Sep 2017 07:53:06 -0700" + ], + [ + "host", + "alexcrichton-test.s3.amazonaws.com" + ] + ], + "body": "H4sIAAAAAAAA/+3AAQEAAACCIP+vbkhQwKsBLq+17wAEAAA=" + }, + "response": { + "status": 200, + "headers": [ + [ + "x-amz-id-2", + "9ZLFTsEwh2iNu+BlWzaTZ85mFA7pxZgsGhQCj3Qi67LqT/iB5eiCOJQPYww2BEkoivJbr0mWruo=" + ], + [ + "date", + "Fri, 15 Sep 2017 14:53:07 GMT" + ], + [ + "content-length", + "0" + ], + [ + "x-amz-request-id", + "FD643F2FC49A7DF3" + ], + [ + "Server", + "AmazonS3" + ], + [ + "ETag", + "\"f9016ad360cebb4fe2e6e96e5949f022\"" + ] + ], + "body": "" + } + }, + { + "request": { + "uri": "http://alexcrichton-test.s3.amazonaws.com/crates/rate_limited2/rate_limited2-1.0.0.crate", + "method": "PUT", + "headers": [ + [ + "content-length", + "35" + ], + [ + "content-type", + "application/x-tar" + ], + [ + "accept-encoding", + "gzip" + ], + [ + "authorization", + "AWS AKIAICL5IWUZYWWKA7JA:3P5wvArAHvV7o8atB0gUc0RTCCc=" + ], + [ + "accept", + "*/*" + ], + [ + "date", + "Fri, 15 Sep 2017 07:53:06 -0700" + ], + [ + "host", + "alexcrichton-test.s3.amazonaws.com" + ] + ], + "body": "H4sIAAAAAAAA/+3AAQEAAACCIP+vbkhQwKsBLq+17wAEAAA=" + }, + "response": { + "status": 200, + "headers": [ + [ + "x-amz-id-2", + "9ZLFTsEwh2iNu+BlWzaTZ85mFA7pxZgsGhQCj3Qi67LqT/iB5eiCOJQPYww2BEkoivJbr0mWruo=" + ], + [ + "date", + "Fri, 15 Sep 2017 14:53:07 GMT" + ], + [ + "content-length", + "0" + ], + [ + "x-amz-request-id", + "FD643F2FC49A7DF3" + ], + [ + "Server", + "AmazonS3" + ], + [ + "ETag", + "\"f9016ad360cebb4fe2e6e96e5949f022\"" + ] + ], + "body": "" + } + } +] diff --git a/src/tests/http-data/krate_publish_publish_new_crate_rate_limited b/src/tests/http-data/krate_publish_publish_new_crates_rate_limit similarity index 100% rename from src/tests/http-data/krate_publish_publish_new_crate_rate_limited rename to src/tests/http-data/krate_publish_publish_new_crates_rate_limit diff --git a/src/tests/http-data/krate_publish_publish_rate_limit_doesnt_affect_existing_crates b/src/tests/http-data/krate_publish_publish_new_crates_rate_limit_doesnt_affect_existing_crates similarity index 100% rename from src/tests/http-data/krate_publish_publish_rate_limit_doesnt_affect_existing_crates rename to src/tests/http-data/krate_publish_publish_new_crates_rate_limit_doesnt_affect_existing_crates diff --git a/src/tests/krate/publish.rs b/src/tests/krate/publish.rs index 80d87d1164f..2d7dc20bd5d 100644 --- a/src/tests/krate/publish.rs +++ b/src/tests/krate/publish.rs @@ -5,6 +5,7 @@ use cargo_registry::controllers::krate::publish::{ missing_metadata_error_message, MISSING_RIGHTS_ERROR_MESSAGE, WILDCARD_ERROR_MESSAGE, }; use cargo_registry::models::krate::MAX_NAME_LENGTH; +use cargo_registry::rate_limiter::LimitedAction; use cargo_registry::schema::{api_tokens, emails, versions_published_by}; use cargo_registry::views::GoodCrate; use diesel::{delete, update, ExpressionMethods, QueryDsl, RunQueryDsl}; @@ -912,9 +913,9 @@ fn new_krate_tarball_with_hard_links() { } #[test] -fn publish_new_crate_rate_limited() { +fn publish_new_crates_rate_limit() { let (app, anon, _, token) = TestApp::full() - .with_publish_rate_limit(Duration::from_millis(500), 1) + .with_rate_limit(LimitedAction::PublishNew, Duration::from_millis(500), 1) .with_token(); // Upload a new crate @@ -941,9 +942,9 @@ fn publish_new_crate_rate_limited() { } #[test] -fn publish_rate_limit_doesnt_affect_existing_crates() { +fn publish_new_crates_rate_limit_doesnt_affect_existing_crates() { let (app, _, _, token) = TestApp::full() - .with_publish_rate_limit(Duration::from_millis(500), 1) + .with_rate_limit(LimitedAction::PublishNew, Duration::from_millis(500), 1) .with_token(); // Upload a new crate @@ -954,3 +955,70 @@ fn publish_rate_limit_doesnt_affect_existing_crates() { token.enqueue_publish(new_version).good(); app.run_pending_background_jobs(); } + +#[test] +fn publish_existing_crates_rate_limit() { + let (app, anon, _, token) = TestApp::full() + .with_rate_limit( + LimitedAction::PublishExisting, + Duration::from_millis(500), + 1, + ) + .with_token(); + + // Upload a new crate (uses a different rate limit not tested here) + let crate_to_publish = PublishBuilder::new("rate_limited").version("1.0.0"); + token.enqueue_publish(crate_to_publish).good(); + + // Upload a new version of the crate + let crate_to_publish = PublishBuilder::new("rate_limited").version("1.0.1"); + token.enqueue_publish(crate_to_publish).good(); + + // Uploading a second version of the crate is limited + let crate_to_publish = PublishBuilder::new("rate_limited").version("1.0.2"); + let response = token.enqueue_publish(crate_to_publish); + assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS); + app.run_pending_background_jobs(); + + let json = anon.show_crate("rate_limited"); + assert_eq!(json.krate.max_version, "1.0.1"); + + // Wait for the limit to be up + thread::sleep(Duration::from_millis(500)); + + let crate_to_publish = PublishBuilder::new("rate_limited").version("1.0.3"); + token.enqueue_publish(crate_to_publish).good(); + + let json = anon.show_crate("rate_limited"); + assert_eq!(json.krate.max_version, "1.0.3"); +} + +#[test] +fn publish_existing_crates_rate_limit_doesnt_affect_new_crates() { + let (app, _, _, token) = TestApp::full() + .with_rate_limit( + LimitedAction::PublishExisting, + Duration::from_millis(500), + 1, + ) + .with_token(); + + // Upload a new crate (uses a different rate limit not tested here) + let crate_to_publish = PublishBuilder::new("rate_limited1").version("1.0.0"); + token.enqueue_publish(crate_to_publish).good(); + + // Upload a new version of the crate + let crate_to_publish = PublishBuilder::new("rate_limited1").version("1.0.1"); + token.enqueue_publish(crate_to_publish).good(); + + // Uploading a second version of the crate is limited + let crate_to_publish = PublishBuilder::new("rate_limited1").version("1.0.2"); + let response = token.enqueue_publish(crate_to_publish); + assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS); + app.run_pending_background_jobs(); + + // Uploading another crate should still work, as the rate limit is separate + let crate_to_publish = PublishBuilder::new("rate_limited2"); + token.enqueue_publish(crate_to_publish).good(); + app.run_pending_background_jobs(); +} diff --git a/src/tests/util/test_app.rs b/src/tests/util/test_app.rs index 12ef95d8aa5..8ba60a861ef 100644 --- a/src/tests/util/test_app.rs +++ b/src/tests/util/test_app.rs @@ -6,8 +6,10 @@ use cargo_registry::{ background_jobs::Environment, db::DieselPool, git::{Credentials, RepositoryConfig}, + rate_limiter::{LimitedAction, RateLimiterConfig}, App, Emails, }; +use std::collections::HashMap; use std::{rc::Rc, sync::Arc, time::Duration}; use cargo_registry::git::Repository as WorkerRepository; @@ -277,10 +279,11 @@ impl TestAppBuilder { self } - pub fn with_publish_rate_limit(self, rate: Duration, burst: i32) -> Self { + pub fn with_rate_limit(self, action: LimitedAction, rate: Duration, burst: i32) -> Self { self.with_config(|config| { - config.publish_rate_limit.rate = rate; - config.publish_rate_limit.burst = burst; + config + .rate_limiter + .insert(action, RateLimiterConfig { rate, burst }); }) } @@ -314,6 +317,17 @@ pub fn init_logger() { } fn simple_config() -> config::Server { + let mut rate_limiter = HashMap::new(); + for action in LimitedAction::VARIANTS { + rate_limiter.insert( + *action, + RateLimiterConfig { + rate: Duration::from_secs(1), + burst: 1024, + }, + ); + } + config::Server { base: config::Base::test(), db: config::DatabasePools::test_from_environment(), @@ -323,7 +337,6 @@ fn simple_config() -> config::Server { gh_base_url: "http://api.github.com".to_string(), max_upload_size: 3000, max_unpack_size: 2000, - publish_rate_limit: Default::default(), blocked_traffic: Default::default(), max_allowed_page_offset: 200, page_offset_ua_blocklist: vec![], @@ -334,6 +347,7 @@ fn simple_config() -> config::Server { metrics_authorization_token: None, use_test_database_pool: true, instance_metrics_log_every_seconds: None, + rate_limiter, } } diff --git a/src/util.rs b/src/util.rs index f1ed10eb1e3..0f9e0866a81 100644 --- a/src/util.rs +++ b/src/util.rs @@ -53,3 +53,43 @@ impl Maximums { } } } + +#[macro_export] +macro_rules! pg_enum { + ( + $vis:vis enum $name:ident { + $($item:ident = $int:expr,)* + } + ) => { + #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, FromSqlRow, AsExpression)] + #[sql_type = "diesel::sql_types::Integer"] + #[serde(rename_all = "snake_case")] + #[repr(i32)] + $vis enum $name { + $($item = $int,)* + } + + impl diesel::deserialize::FromSql for $name { + fn from_sql(bytes: Option<&[u8]>) -> diesel::deserialize::Result { + match >::from_sql(bytes)? { + $($int => Ok(Self::$item),)* + n => Err(format!("unknown value: {}", n).into()), + } + } + } + + impl diesel::serialize::ToSql for $name { + fn to_sql( + &self, + out: &mut diesel::serialize::Output<'_, W, diesel::pg::Pg>, + ) -> diesel::serialize::Result { + diesel::serialize::ToSql::::to_sql(&(*self as i32), out) + } + } + + impl $name { + #[allow(unused)] + $vis const VARIANTS: &'static [Self] = &[$($name::$item),*]; + } + } +} diff --git a/src/util/errors/json.rs b/src/util/errors/json.rs index 3622f41706f..dadec076275 100644 --- a/src/util/errors/json.rs +++ b/src/util/errors/json.rs @@ -1,6 +1,7 @@ use std::fmt; use super::{AppError, InternalAppErrorStatic}; +use crate::rate_limiter::LimitedAction; use crate::util::{json_response, AppResponse}; use chrono::NaiveDateTime; @@ -90,6 +91,7 @@ pub(super) struct BadRequest(pub(super) String); pub(super) struct ServerError(pub(super) String); #[derive(Debug)] pub(crate) struct TooManyRequests { + pub action: LimitedAction, pub retry_after: NaiveDateTime, } @@ -137,9 +139,9 @@ impl AppError for TooManyRequests { let retry_after = self.retry_after.format(HTTP_DATE_FORMAT); let detail = format!( - "You have published too many crates in a \ - short period of time. Please try again after {} or email \ + "{} Please try again after {} or email \ help@crates.io to have your limit increased.", + self.action.error_message(), retry_after ); let mut response = json_error(&detail, StatusCode::TOO_MANY_REQUESTS);