diff --git a/.env.sample b/.env.sample index 38bdda11fbe..896aeacfdbd 100644 --- a/.env.sample +++ b/.env.sample @@ -45,6 +45,13 @@ export TEST_DATABASE_URL= # Uses AWS credentials. # export CLOUDFRONT_DISTRIBUTION= +# Configuration for the CDN log queue. You can leave these commented out if +# you're not using the CDN log queue. +# export CDN_LOG_QUEUE_ACCESS_KEY= +# export CDN_LOG_QUEUE_SECRET_KEY= +# export CDN_LOG_QUEUE_URL= +# export CDN_LOG_QUEUE_REGION= + # Upstream location of the registry index. Background jobs will push to # this URL. The default points to a local index for development. # Run `./script/init-local-index.sh` to initialize this repo. diff --git a/Cargo.lock b/Cargo.lock index fe7b10abc29..923c2ef4dbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,6 +291,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-sqs" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1bd3f54f12028a536c9b4450c425d7c510f1472d81169d69b603263a4dbf6a3" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sigv4" version = "1.1.4" @@ -947,6 +969,7 @@ dependencies = [ "aws-credential-types", "aws-ip-ranges", "aws-sdk-cloudfront", + "aws-sdk-sqs", "axum", "axum-extra", "base64 0.21.7", @@ -957,6 +980,7 @@ dependencies = [ "claims", "clap", "cookie", + "crates_io_cdn_logs", "crates_io_env_vars", "crates_io_github", "crates_io_index", diff --git a/Cargo.toml b/Cargo.toml index 0e43938e452..9d2d2eb8c1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,11 +52,13 @@ async-trait = "=0.1.77" aws-credential-types = { version = "=1.1.4", features = ["hardcoded-credentials"] } aws-ip-ranges = "=0.90.0" aws-sdk-cloudfront = "=1.12.0" +aws-sdk-sqs = "=1.12.0" axum = { version = "=0.7.4", features = ["macros", "matched-path"] } axum-extra = { version = "=0.9.2", features = ["cookie-signed", "typed-header"] } base64 = "=0.21.7" bigdecimal = "=0.4.2" cargo-manifest = "=0.13.0" +crates_io_cdn_logs = { path = "crates_io_cdn_logs" } crates_io_env_vars = { path = "crates_io_env_vars" } crates_io_github = { path = "crates_io_github" } crates_io_index = { path = "crates_io_index" } diff --git a/src/admin/enqueue_job.rs b/src/admin/enqueue_job.rs index 04f55ce67ee..0c4ffcbccca 100644 --- a/src/admin/enqueue_job.rs +++ b/src/admin/enqueue_job.rs @@ -31,6 +31,7 @@ pub enum Command { #[arg()] name: String, }, + ProcessCdnLogQueue(jobs::ProcessCdnLogQueue), SyncAdmins { /// Force a sync even if one is already in progress #[arg(long)] @@ -89,6 +90,9 @@ pub fn run(command: Command) -> Result<()> { Command::DailyDbMaintenance => { jobs::DailyDbMaintenance.enqueue(conn)?; } + Command::ProcessCdnLogQueue(job) => { + job.enqueue(conn)?; + } Command::SquashIndex => { jobs::SquashIndex.enqueue(conn)?; } diff --git a/src/app.rs b/src/app.rs index 9cc8f460039..43099756b53 100644 --- a/src/app.rs +++ b/src/app.rs @@ -34,7 +34,7 @@ pub struct App { pub github_oauth: BasicClient, /// The server configuration - pub config: config::Server, + pub config: Arc, /// Cache the `version_id` of a `canonical_crate_name:semver` pair /// @@ -158,7 +158,7 @@ impl App { instance_metrics, balance_capacity: Default::default(), rate_limiter: RateLimiter::new(config.rate_limiter.clone()), - config, + config: Arc::new(config), } } diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 0c6a8218397..25ca227ccfe 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -86,6 +86,7 @@ fn main() -> anyhow::Result<()> { .build_unchecked(ConnectionManager::new(db_url)); let environment = Environment::builder() + .config(Arc::new(config)) .repository_config(repository_config) .cloudfront(cloudfront) .fastly(fastly) diff --git a/src/config.rs b/src/config.rs index 39eb6068d3b..a71a70a1253 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,15 @@ mod balance_capacity; mod base; +mod cdn_log_queue; +mod cdn_log_storage; mod database_pools; mod sentry; mod server; pub use self::balance_capacity::BalanceCapacityConfig; pub use self::base::Base; +pub use self::cdn_log_queue::CdnLogQueueConfig; +pub use self::cdn_log_storage::CdnLogStorageConfig; pub use self::database_pools::{DatabasePools, DbPoolConfig}; pub use self::sentry::SentryConfig; pub use self::server::Server; diff --git a/src/config/cdn_log_queue.rs b/src/config/cdn_log_queue.rs new file mode 100644 index 00000000000..d279355b90d --- /dev/null +++ b/src/config/cdn_log_queue.rs @@ -0,0 +1,33 @@ +use crates_io_env_vars::{required_var, var}; +use secrecy::SecretString; + +#[derive(Debug, Clone)] +pub enum CdnLogQueueConfig { + SQS { + access_key: String, + secret_key: SecretString, + queue_url: String, + region: String, + }, + Mock, +} + +impl CdnLogQueueConfig { + pub fn from_env() -> anyhow::Result { + if let Some(queue_url) = var("CDN_LOG_QUEUE_URL")? { + let access_key = required_var("CDN_LOG_QUEUE_ACCESS_KEY")?; + let secret_key = required_var("CDN_LOG_QUEUE_SECRET_KEY")?.into(); + let region = required_var("CDN_LOG_QUEUE_REGION")?; + + return Ok(Self::SQS { + access_key, + secret_key, + queue_url, + region, + }); + } + + warn!("Falling back to mocked CDN log queue"); + Ok(Self::Mock) + } +} diff --git a/src/config/cdn_log_storage.rs b/src/config/cdn_log_storage.rs new file mode 100644 index 00000000000..6b030c51b9c --- /dev/null +++ b/src/config/cdn_log_storage.rs @@ -0,0 +1,53 @@ +use anyhow::Context; +use crates_io_env_vars::{required_var, var}; +use secrecy::SecretString; +use std::path::PathBuf; + +#[derive(Debug, Clone)] +pub enum CdnLogStorageConfig { + S3 { + access_key: String, + secret_key: SecretString, + }, + Local { + path: PathBuf, + }, + Memory, +} + +impl CdnLogStorageConfig { + pub fn s3(access_key: String, secret_key: SecretString) -> Self { + Self::S3 { + access_key, + secret_key, + } + } + + pub fn local(path: PathBuf) -> Self { + Self::Local { path } + } + + pub fn memory() -> Self { + Self::Memory + } + + pub fn from_env() -> anyhow::Result { + if let Some(access_key) = var("AWS_ACCESS_KEY")? { + let secret_key = required_var("AWS_SECRET_KEY")?.into(); + return Ok(Self::s3(access_key, secret_key)); + } + + let current_dir = std::env::current_dir(); + let current_dir = current_dir.context("Failed to read the current directory")?; + + let path = current_dir.join("local_uploads"); + let path_display = path.display(); + if path.exists() { + info!("Falling back to local CDN log storage at {path_display}"); + return Ok(Self::local(path)); + } + + warn!("Falling back to in-memory CDN log storage because {path_display} does not exist"); + Ok(Self::memory()) + } +} diff --git a/src/config/server.rs b/src/config/server.rs index 746bc5a6909..e484561b9b6 100644 --- a/src/config/server.rs +++ b/src/config/server.rs @@ -8,6 +8,8 @@ use crate::Env; use super::base::Base; use super::database_pools::DatabasePools; use crate::config::balance_capacity::BalanceCapacityConfig; +use crate::config::cdn_log_storage::CdnLogStorageConfig; +use crate::config::CdnLogQueueConfig; use crate::middleware::cargo_compat::StatusCodeConfig; use crate::storage::StorageConfig; use crates_io_env_vars::{list, list_parsed, required_var, var, var_parsed}; @@ -34,6 +36,8 @@ pub struct Server { pub max_blocking_threads: Option, pub db: DatabasePools, pub storage: StorageConfig, + pub cdn_log_storage: CdnLogStorageConfig, + pub cdn_log_queue: CdnLogQueueConfig, pub session_key: cookie::Key, pub gh_client_id: ClientId, pub gh_client_secret: ClientSecret, @@ -172,6 +176,8 @@ impl Server { Ok(Server { db: DatabasePools::full_from_environment(&base)?, storage, + cdn_log_storage: CdnLogStorageConfig::from_env()?, + cdn_log_queue: CdnLogQueueConfig::from_env()?, base, ip, port, diff --git a/src/lib.rs b/src/lib.rs index b0af96db80d..f57b74abe12 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,7 @@ mod router; pub mod schema; pub mod sentry; pub mod sql; +pub mod sqs; pub mod ssh; pub mod storage; pub mod tasks; diff --git a/src/sqs.rs b/src/sqs.rs new file mode 100644 index 00000000000..1521b496b29 --- /dev/null +++ b/src/sqs.rs @@ -0,0 +1,94 @@ +use anyhow::Context; +use async_trait::async_trait; +use aws_credential_types::Credentials; +use aws_sdk_sqs::config::{BehaviorVersion, Region}; +use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput; +use mockall::automock; + +/// The [SqsQueue] trait defines a basic interface for interacting with an +/// AWS SQS queue. +/// +/// A [MockSqsQueue] struct is automatically generated by the [automock] +/// attribute. This struct can be used in unit tests to mock the behavior of +/// the [SqsQueue] trait. +/// +/// The [SqsQueueImpl] struct is the actual implementation of the trait. +#[automock] +#[async_trait] +pub trait SqsQueue { + async fn receive_messages(&self, max_messages: i32) -> anyhow::Result; + async fn delete_message(&self, receipt_handle: &str) -> anyhow::Result<()>; +} + +/// The [SqsQueueImpl] struct is the actual implementation of the [SqsQueue] +/// trait, which interacts with the real AWS API servers. +#[derive(Debug, Clone)] +pub struct SqsQueueImpl { + client: aws_sdk_sqs::Client, + queue_url: String, +} + +impl SqsQueueImpl { + pub fn new(queue_url: impl Into, region: Region, credentials: Credentials) -> Self { + let config = aws_sdk_sqs::Config::builder() + .credentials_provider(credentials) + .region(region) + .behavior_version(BehaviorVersion::v2023_11_09()) + .build(); + + let client = aws_sdk_sqs::Client::from_conf(config); + let queue_url = queue_url.into(); + + SqsQueueImpl { client, queue_url } + } +} + +#[async_trait] +impl SqsQueue for SqsQueueImpl { + async fn receive_messages(&self, max_messages: i32) -> anyhow::Result { + let response = self + .client + .receive_message() + .max_number_of_messages(max_messages) + .queue_url(&self.queue_url) + .send() + .await + .context("Failed to receive SQS queue message")?; + + Ok(response) + } + + async fn delete_message(&self, receipt_handle: &str) -> anyhow::Result<()> { + self.client + .delete_message() + .receipt_handle(receipt_handle) + .queue_url(&self.queue_url) + .send() + .await + .context("Failed to delete SQS queue message")?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_constructor() { + let credentials = Credentials::new( + "ANOTREAL", + "notrealrnrELgWzOk3IfjzDKtFBhDby", + None, + None, + "test", + ); + + let queue_url = "https://sqs.us-west-1.amazonaws.com/359172468976/cdn-log-event-queue"; + let region = Region::new("us-west-1"); + + // Check that `SqsQueueImpl::new()` does not panic. + let _queue = SqsQueueImpl::new(queue_url, region, credentials); + } +} diff --git a/src/tests/util/test_app.rs b/src/tests/util/test_app.rs index 8a91ca0e0b3..ee769e66f58 100644 --- a/src/tests/util/test_app.rs +++ b/src/tests/util/test_app.rs @@ -2,7 +2,10 @@ use super::{MockAnonymousUser, MockCookieUser, MockTokenUser}; use crate::util::chaosproxy::ChaosProxy; use crate::util::github::{MockGitHubClient, MOCK_GITHUB_DATA}; use anyhow::Context; -use crates_io::config::{self, BalanceCapacityConfig, Base, DatabasePools, DbPoolConfig}; +use crates_io::config::{ + self, BalanceCapacityConfig, Base, CdnLogQueueConfig, CdnLogStorageConfig, DatabasePools, + DbPoolConfig, +}; use crates_io::middleware::cargo_compat::StatusCodeConfig; use crates_io::models::token::{CrateScope, EndpointScope}; use crates_io::rate_limiter::{LimitedAction, RateLimiterConfig}; @@ -270,6 +273,7 @@ impl TestAppBuilder { }; let environment = Environment::builder() + .config(app.config.clone()) .repository_config(repository_config) .storage(app.storage.clone()) .connection_pool(app.primary_database.clone()) @@ -421,6 +425,8 @@ fn simple_config() -> config::Server { max_blocking_threads: None, db, storage, + cdn_log_queue: CdnLogQueueConfig::Mock, + cdn_log_storage: CdnLogStorageConfig::memory(), session_key: cookie::Key::derive_from("test this has to be over 32 bytes long".as_bytes()), gh_client_id: ClientId::new(dotenvy::var("GH_CLIENT_ID").unwrap_or_default()), gh_client_secret: ClientSecret::new(dotenvy::var("GH_CLIENT_SECRET").unwrap_or_default()), diff --git a/src/worker/environment.rs b/src/worker/environment.rs index 94143a36944..61d6eae6b59 100644 --- a/src/worker/environment.rs +++ b/src/worker/environment.rs @@ -16,6 +16,8 @@ use std::time::Instant; #[derive(Builder)] #[builder(pattern = "owned")] pub struct Environment { + pub config: Arc, + repository_config: RepositoryConfig, #[builder(default, setter(skip))] repository: Mutex>, diff --git a/src/worker/jobs/downloads.rs b/src/worker/jobs/downloads.rs new file mode 100644 index 00000000000..b672a23907e --- /dev/null +++ b/src/worker/jobs/downloads.rs @@ -0,0 +1,521 @@ +mod message; + +use crate::config::{CdnLogQueueConfig, CdnLogStorageConfig}; +use crate::db::DieselPool; +use crate::sqs::{MockSqsQueue, SqsQueue, SqsQueueImpl}; +use crate::tasks::spawn_blocking; +use crate::worker::Environment; +use anyhow::Context; +use aws_credential_types::Credentials; +use aws_sdk_sqs::config::Region; +use crates_io_cdn_logs::{count_downloads, Decompressor}; +use crates_io_worker::BackgroundJob; +use object_store::aws::AmazonS3Builder; +use object_store::local::LocalFileSystem; +use object_store::memory::InMemory; +use object_store::ObjectStore; +use std::cmp::Reverse; +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Instant; +use tokio::io::BufReader; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ProcessCdnLog { + region: String, + bucket: String, + path: String, +} + +impl ProcessCdnLog { + pub fn new(region: String, bucket: String, path: String) -> Self { + Self { + region, + bucket, + path, + } + } +} + +impl BackgroundJob for ProcessCdnLog { + const JOB_NAME: &'static str = "process_cdn_log"; + + type Context = Arc; + + async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> { + let store = self + .build_store(&ctx.config.cdn_log_storage) + .context("Failed to build object store")?; + + self.run(store).await + } +} + +impl ProcessCdnLog { + fn build_store(&self, config: &CdnLogStorageConfig) -> anyhow::Result> { + match config { + CdnLogStorageConfig::S3 { + access_key, + secret_key, + } => { + use secrecy::ExposeSecret; + + let store = AmazonS3Builder::new() + .with_region(&self.region) + .with_bucket_name(&self.bucket) + .with_access_key_id(access_key) + .with_secret_access_key(secret_key.expose_secret()) + .build()?; + + Ok(Box::new(store)) + } + CdnLogStorageConfig::Local { path } => { + Ok(Box::new(LocalFileSystem::new_with_prefix(path)?)) + } + CdnLogStorageConfig::Memory => Ok(Box::new(InMemory::new())), + } + } + + async fn run(&self, store: Box) -> anyhow::Result<()> { + let path = object_store::path::Path::parse(&self.path) + .with_context(|| format!("Failed to parse path: {:?}", self.path))?; + + let meta = store.head(&path).await; + let meta = meta.with_context(|| format!("Failed to request metadata for {path:?}"))?; + + let reader = object_store::buffered::BufReader::new(Arc::new(store), &meta); + let decompressor = Decompressor::from_extension(reader, path.extension())?; + let reader = BufReader::new(decompressor); + + let parse_start = Instant::now(); + let downloads = count_downloads(reader).await?; + let parse_duration = parse_start.elapsed(); + + // TODO: for now this background job just prints out the results, but + // eventually it should insert them into the database instead. + + if downloads.as_inner().is_empty() { + info!("No downloads found in log file: {path}"); + return Ok(()); + } + + let num_crates = downloads + .as_inner() + .iter() + .map(|((_, krate, _), _)| krate) + .collect::>() + .len(); + + let total_inserts = downloads.as_inner().len(); + + let total_downloads = downloads + .as_inner() + .iter() + .map(|(_, downloads)| downloads) + .sum::(); + + info!("Log file: {path}"); + info!("Number of crates: {num_crates}"); + info!("Number of needed inserts: {total_inserts}"); + info!("Total number of downloads: {total_downloads}"); + info!("Time to parse: {parse_duration:?}"); + + let mut downloads = downloads.into_inner().into_iter().collect::>(); + downloads.sort_by_key(|((_, _, _), downloads)| Reverse(*downloads)); + + let top_downloads = downloads + .into_iter() + .take(30) + .map(|((krate, version, date), downloads)| { + format!("{date} {krate}@{version} .. {downloads}") + }) + .collect::>(); + + info!("Top 30 downloads: {top_downloads:?}"); + + Ok(()) + } +} + +#[derive(Debug, Serialize, Deserialize, clap::Parser)] +pub struct ProcessCdnLogQueue { + /// The maximum number of messages to receive from the queue and process. + #[clap(long, default_value = "1")] + max_messages: usize, +} + +impl BackgroundJob for ProcessCdnLogQueue { + const JOB_NAME: &'static str = "process_cdn_log_queue"; + + type Context = Arc; + + async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> { + let queue = Self::build_queue(&ctx.config.cdn_log_queue); + self.run(queue, &ctx.connection_pool).await + } +} + +impl ProcessCdnLogQueue { + fn build_queue(config: &CdnLogQueueConfig) -> Box { + match config { + CdnLogQueueConfig::Mock => Box::new(MockSqsQueue::new()), + CdnLogQueueConfig::SQS { + access_key, + secret_key, + region, + queue_url, + } => { + use secrecy::ExposeSecret; + + let secret_key = secret_key.expose_secret(); + let credentials = Credentials::from_keys(access_key, secret_key, None); + + let region = Region::new(region.to_owned()); + + Box::new(SqsQueueImpl::new(queue_url, region, credentials)) + } + } + } + + async fn run( + &self, + queue: Box, + connection_pool: &DieselPool, + ) -> anyhow::Result<()> { + const MAX_BATCH_SIZE: usize = 10; + + info!("Receiving messages from the CDN log queue…"); + let mut num_remaining = self.max_messages; + while num_remaining > 0 { + let batch_size = num_remaining.min(MAX_BATCH_SIZE); + num_remaining -= batch_size; + + debug!("Receiving next {batch_size} messages from the CDN log queue…"); + let response = queue.receive_messages(batch_size as i32).await?; + + let messages = response.messages(); + debug!( + "Received {num_messages} messages from the CDN log queue", + num_messages = messages.len() + ); + if messages.is_empty() { + info!("No more messages to receive from the CDN log queue"); + break; + } + + for message in messages { + let message_id = message.message_id().unwrap_or(""); + debug!("Processing message: {message_id}"); + + let Some(receipt_handle) = message.receipt_handle() else { + warn!("Message {message_id} has no receipt handle; skipping"); + continue; + }; + + debug!("Deleting message {message_id} from the CDN log queue…"); + queue + .delete_message(receipt_handle) + .await + .with_context(|| { + format!("Failed to delete message {message_id} from the CDN log queue") + })?; + + let Some(body) = message.body() else { + warn!("Message {message_id} has no body; skipping"); + continue; + }; + + let message = match serde_json::from_str::(body) { + Ok(message) => message, + Err(err) => { + warn!("Failed to parse message {message_id}: {err}"); + continue; + } + }; + + if message.records.is_empty() { + warn!("Message {message_id} has no records; skipping"); + continue; + } + + let pool = connection_pool.clone(); + spawn_blocking({ + let message_id = message_id.to_owned(); + move || { + let mut conn = pool + .get() + .context("Failed to acquire database connection")?; + + for record in message.records { + let region = record.aws_region; + let bucket = record.s3.bucket.name; + let path = record.s3.object.key; + + let path = match object_store::path::Path::from_url_path(&path) { + Ok(path) => path, + Err(err) => { + warn!("Failed to parse path ({path}): {err}"); + continue; + } + }; + + info!("Enqueuing processing job for message {message_id}… ({path})"); + let job = ProcessCdnLog::new(region, bucket, path.as_ref().to_owned()); + + job.enqueue(&mut conn).with_context(|| { + format!("Failed to enqueue processing job for message {message_id}") + })?; + + debug!("Enqueued processing job for message {message_id}"); + } + + Ok::<_, anyhow::Error>(()) + } + }) + .await?; + + debug!("Processed message: {message_id}"); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use aws_sdk_sqs::operation::receive_message::builders::ReceiveMessageOutputBuilder; + use aws_sdk_sqs::types::builders::MessageBuilder; + use aws_sdk_sqs::types::Message; + use crates_io_test_db::TestDatabase; + use crates_io_worker::schema::background_jobs; + use diesel::prelude::*; + use diesel::r2d2::{ConnectionManager, Pool}; + use diesel::QueryDsl; + use insta::assert_snapshot; + use parking_lot::Mutex; + + #[tokio::test] + async fn test_process_cdn_log() { + let _guard = crate::util::tracing::init_for_test(); + + let path = "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz"; + + let job = ProcessCdnLog::new( + "us-west-1".to_string(), + "bucket".to_string(), + path.to_string(), + ); + + let config = CdnLogStorageConfig::memory(); + let store = assert_ok!(job.build_store(&config)); + + // Add dummy data into the store + { + let bytes = + include_bytes!("../../../crates_io_cdn_logs/test_data/cloudfront/basic.log.gz"); + + store.put(&path.into(), bytes[..].into()).await.unwrap(); + } + + assert_ok!(job.run(store).await); + } + + #[tokio::test] + async fn test_s3_builder() { + let path = "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz"; + + let job = ProcessCdnLog::new( + "us-west-1".to_string(), + "bucket".to_string(), + path.to_string(), + ); + + let access_key = "access_key".into(); + let secret_key = "secret_key".to_string().into(); + let config = CdnLogStorageConfig::s3(access_key, secret_key); + assert_ok!(job.build_store(&config)); + } + + #[tokio::test] + async fn test_process_cdn_log_queue() { + let _guard = crate::util::tracing::init_for_test(); + + let mut queue = Box::new(MockSqsQueue::new()); + queue + .expect_receive_messages() + .once() + .returning(|_max_messages| { + Ok(ReceiveMessageOutputBuilder::default() + .messages(message("123", "us-west-1", "bucket", "path")) + .build()) + }); + + queue + .expect_receive_messages() + .once() + .returning(|_max_messages| Ok(ReceiveMessageOutputBuilder::default().build())); + + let deleted_handles = record_deleted_handles(&mut queue); + + let test_database = TestDatabase::new(); + let connection_pool = build_connection_pool(test_database.url()); + + let job = ProcessCdnLogQueue { max_messages: 100 }; + assert_ok!(job.run(queue, &connection_pool).await); + + assert_snapshot!(deleted_handles.lock().join(","), @"123"); + assert_snapshot!(open_jobs(&mut test_database.connect()), @"us-west-1 | bucket | path"); + } + + #[tokio::test] + async fn test_process_cdn_log_queue_multi_page() { + let _guard = crate::util::tracing::init_for_test(); + + let mut queue = Box::new(MockSqsQueue::new()); + queue + .expect_receive_messages() + .once() + .returning(|_max_messages| { + Ok(ReceiveMessageOutputBuilder::default() + .messages(message("1", "us-west-1", "bucket", "path1")) + .messages(message("2", "us-west-1", "bucket", "path2")) + .messages(message("3", "us-west-1", "bucket", "path3")) + .messages(message("4", "us-west-1", "bucket", "path4")) + .messages(message("5", "us-west-1", "bucket", "path5")) + .messages(message("6", "us-west-1", "bucket", "path6")) + .messages(message("7", "us-west-1", "bucket", "path7")) + .messages(message("8", "us-west-1", "bucket", "path8")) + .messages(message("9", "us-west-1", "bucket", "path9")) + .messages(message("10", "us-west-1", "bucket", "path10")) + .build()) + }); + + queue + .expect_receive_messages() + .once() + .returning(|_max_messages| { + Ok(ReceiveMessageOutputBuilder::default() + .messages(message("11", "us-west-1", "bucket", "path11")) + .build()) + }); + + queue + .expect_receive_messages() + .once() + .returning(|_max_messages| Ok(ReceiveMessageOutputBuilder::default().build())); + + let deleted_handles = record_deleted_handles(&mut queue); + + let test_database = TestDatabase::new(); + let connection_pool = build_connection_pool(test_database.url()); + + let job = ProcessCdnLogQueue { max_messages: 100 }; + assert_ok!(job.run(queue, &connection_pool).await); + + assert_snapshot!(deleted_handles.lock().join(","), @"1,2,3,4,5,6,7,8,9,10,11"); + assert_snapshot!(open_jobs(&mut test_database.connect()), @r###" + us-west-1 | bucket | path1 + us-west-1 | bucket | path2 + us-west-1 | bucket | path3 + us-west-1 | bucket | path4 + us-west-1 | bucket | path5 + us-west-1 | bucket | path6 + us-west-1 | bucket | path7 + us-west-1 | bucket | path8 + us-west-1 | bucket | path9 + us-west-1 | bucket | path10 + us-west-1 | bucket | path11 + "###); + } + + #[tokio::test] + async fn test_process_cdn_log_queue_parse_error() { + let _guard = crate::util::tracing::init_for_test(); + + let mut queue = Box::new(MockSqsQueue::new()); + queue + .expect_receive_messages() + .once() + .returning(|_max_messages| { + let message = MessageBuilder::default() + .message_id("1") + .receipt_handle("1") + .body(serde_json::to_string("{}").unwrap()) + .build(); + + Ok(ReceiveMessageOutputBuilder::default() + .messages(message) + .build()) + }); + + queue + .expect_receive_messages() + .once() + .returning(|_max_messages| Ok(ReceiveMessageOutputBuilder::default().build())); + + let deleted_handles = record_deleted_handles(&mut queue); + + let test_database = TestDatabase::new(); + let connection_pool = build_connection_pool(test_database.url()); + + let job = ProcessCdnLogQueue { max_messages: 100 }; + assert_ok!(job.run(queue, &connection_pool).await); + + assert_snapshot!(deleted_handles.lock().join(","), @"1"); + assert_snapshot!(open_jobs(&mut test_database.connect()), @""); + } + + fn record_deleted_handles(queue: &mut MockSqsQueue) -> Arc>> { + let deleted_handles = Arc::new(Mutex::new(vec![])); + + queue.expect_delete_message().returning({ + let deleted_handles = deleted_handles.clone(); + move |receipt_handle| { + deleted_handles.lock().push(receipt_handle.to_owned()); + Ok(()) + } + }); + + deleted_handles + } + + fn build_connection_pool(url: &str) -> DieselPool { + let pool = Pool::builder().build(ConnectionManager::new(url)).unwrap(); + DieselPool::new_background_worker(pool) + } + + fn message(id: &str, region: &str, bucket: &str, path: &str) -> Message { + let json = json!({ + "Records": [{ + "awsRegion": region, + "s3": { + "bucket": { "name": bucket }, + "object": { "key": path }, + } + }] + }); + + MessageBuilder::default() + .message_id(id) + .receipt_handle(id) + .body(serde_json::to_string(&json).unwrap()) + .build() + } + + fn open_jobs(conn: &mut PgConnection) -> String { + let jobs = background_jobs::table + .select((background_jobs::job_type, background_jobs::data)) + .load::<(String, serde_json::Value)>(conn) + .unwrap(); + + jobs.into_iter() + .inspect(|(job_type, _data)| assert_eq!(job_type, ProcessCdnLog::JOB_NAME)) + .map(|(_job_type, data)| data) + .map(|data| serde_json::from_value::(data).unwrap()) + .map(|job| format!("{} | {} | {}", job.region, job.bucket, job.path)) + .collect::>() + .join("\n") + } +} diff --git a/src/worker/jobs/downloads/fixtures/empty-event.json b/src/worker/jobs/downloads/fixtures/empty-event.json new file mode 100644 index 00000000000..d0e9e481b05 --- /dev/null +++ b/src/worker/jobs/downloads/fixtures/empty-event.json @@ -0,0 +1,3 @@ +{ + "Records": [] +} diff --git a/src/worker/jobs/downloads/fixtures/multi-event.json b/src/worker/jobs/downloads/fixtures/multi-event.json new file mode 100644 index 00000000000..ea112c63c98 --- /dev/null +++ b/src/worker/jobs/downloads/fixtures/multi-event.json @@ -0,0 +1,72 @@ +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-west-1", + "eventTime": "2024-01-16T16:24:06.554Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal" + }, + "requestParameters": { + "sourceIPAddress": "72.21.217.31" + }, + "responseElements": { + "x-amz-request-id": "WK0MBXBCXJ4Y1KDX", + "x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "cloudfront", + "bucket": { + "name": "rust-staging-crates-io-logs", + "ownerIdentity": { + "principalId": "A266IOH8VA1FN7" + }, + "arn": "arn:aws:s3:::rust-staging-crates-io-logs" + }, + "object": { + "key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz", + "size": 710, + "eTag": "4c8266e632a8f1ed489c19821680da6e", + "sequencer": "0065A6ADA6765BF0F0" + } + } + }, + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-west-1", + "eventTime": "2024-01-16T16:24:06.554Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal" + }, + "requestParameters": { + "sourceIPAddress": "72.21.217.31" + }, + "responseElements": { + "x-amz-request-id": "WK0MBXBCXJ4Y1KDX", + "x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "cloudfront", + "bucket": { + "name": "rust-staging-crates-io-logs", + "ownerIdentity": { + "principalId": "A266IOH8VA1FN7" + }, + "arn": "arn:aws:s3:::rust-staging-crates-io-logs" + }, + "object": { + "key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz", + "size": 710, + "eTag": "4c8266e632a8f1ed489c19821680da6e", + "sequencer": "0065A6ADA6765BF0F0" + } + } + } + ] +} diff --git a/src/worker/jobs/downloads/fixtures/valid-event.json b/src/worker/jobs/downloads/fixtures/valid-event.json new file mode 100644 index 00000000000..05bcfca4a13 --- /dev/null +++ b/src/worker/jobs/downloads/fixtures/valid-event.json @@ -0,0 +1,38 @@ +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-west-1", + "eventTime": "2024-01-16T16:24:06.554Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal" + }, + "requestParameters": { + "sourceIPAddress": "72.21.217.31" + }, + "responseElements": { + "x-amz-request-id": "WK0MBXBCXJ4Y1KDX", + "x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "cloudfront", + "bucket": { + "name": "rust-staging-crates-io-logs", + "ownerIdentity": { + "principalId": "A266IOH8VA1FN7" + }, + "arn": "arn:aws:s3:::rust-staging-crates-io-logs" + }, + "object": { + "key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz", + "size": 710, + "eTag": "4c8266e632a8f1ed489c19821680da6e", + "sequencer": "0065A6ADA6765BF0F0" + } + } + } + ] +} diff --git a/src/worker/jobs/downloads/message.rs b/src/worker/jobs/downloads/message.rs new file mode 100644 index 00000000000..428bd171eaf --- /dev/null +++ b/src/worker/jobs/downloads/message.rs @@ -0,0 +1,56 @@ +use std::str::FromStr; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Object { + pub key: String, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Bucket { + pub name: String, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct S3 { + pub bucket: Bucket, + pub object: Object, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Record { + #[serde(rename = "awsRegion")] + pub aws_region: String, + pub s3: S3, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Message { + #[serde(rename = "Records")] + pub records: Vec, +} + +impl FromStr for Message { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_str(s) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use insta::assert_debug_snapshot; + + #[test] + fn test_parse() { + let event = assert_ok!(include_str!("./fixtures/empty-event.json").parse::()); + assert_debug_snapshot!(event); + + let event = assert_ok!(include_str!("./fixtures/valid-event.json").parse::()); + assert_debug_snapshot!(event); + + let event = assert_ok!(include_str!("./fixtures/multi-event.json").parse::()); + assert_debug_snapshot!(event); + } +} diff --git a/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse-2.snap b/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse-2.snap new file mode 100644 index 00000000000..721bb3e2fcd --- /dev/null +++ b/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse-2.snap @@ -0,0 +1,19 @@ +--- +source: src/worker/jobs/downloads/message.rs +expression: event +--- +Message { + records: [ + Record { + aws_region: "us-west-1", + s3: S3 { + bucket: Bucket { + name: "rust-staging-crates-io-logs", + }, + object: Object { + key: "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz", + }, + }, + }, + ], +} diff --git a/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse-3.snap b/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse-3.snap new file mode 100644 index 00000000000..f3891bdb86a --- /dev/null +++ b/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse-3.snap @@ -0,0 +1,30 @@ +--- +source: src/worker/jobs/downloads/message.rs +expression: event +--- +Message { + records: [ + Record { + aws_region: "us-west-1", + s3: S3 { + bucket: Bucket { + name: "rust-staging-crates-io-logs", + }, + object: Object { + key: "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz", + }, + }, + }, + Record { + aws_region: "us-west-1", + s3: S3 { + bucket: Bucket { + name: "rust-staging-crates-io-logs", + }, + object: Object { + key: "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz", + }, + }, + }, + ], +} diff --git a/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse.snap b/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse.snap new file mode 100644 index 00000000000..d1b311a050a --- /dev/null +++ b/src/worker/jobs/downloads/snapshots/crates_io__worker__jobs__downloads__message__tests__parse.snap @@ -0,0 +1,7 @@ +--- +source: src/worker/jobs/downloads/message.rs +expression: event +--- +Message { + records: [], +} diff --git a/src/worker/jobs/mod.rs b/src/worker/jobs/mod.rs index 916bd160222..b2656f6908b 100644 --- a/src/worker/jobs/mod.rs +++ b/src/worker/jobs/mod.rs @@ -6,6 +6,7 @@ use diesel::sql_types::{Int2, Jsonb, Text}; use std::fmt::Display; mod daily_db_maintenance; +mod downloads; pub mod dump_db; mod git; mod readmes; @@ -14,6 +15,7 @@ mod typosquat; mod update_downloads; pub use self::daily_db_maintenance::DailyDbMaintenance; +pub use self::downloads::{ProcessCdnLog, ProcessCdnLogQueue}; pub use self::dump_db::DumpDb; pub use self::git::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex}; pub use self::readmes::RenderAndUploadReadme; diff --git a/src/worker/mod.rs b/src/worker/mod.rs index cfe4f5675ff..08584b27cdd 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -23,6 +23,8 @@ impl RunnerExt for Runner> { .register_job_type::() .register_job_type::() .register_job_type::() + .register_job_type::() + .register_job_type::() .register_job_type::() .register_job_type::() .register_job_type::()