Skip to content

Commit 348c52b

Browse files
authored
Merge pull request #8036 from Turbo87/queue-jobs
Implement `ProcessCdnLog` and `ProcessCdnLogQueue` background jobs
2 parents 1f4abb9 + c27b101 commit 348c52b

24 files changed

+990
-3
lines changed

.env.sample

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ export TEST_DATABASE_URL=
4545
# Uses AWS credentials.
4646
# export CLOUDFRONT_DISTRIBUTION=
4747

48+
# Configuration for the CDN log queue. You can leave these commented out if
49+
# you're not using the CDN log queue.
50+
# export CDN_LOG_QUEUE_ACCESS_KEY=
51+
# export CDN_LOG_QUEUE_SECRET_KEY=
52+
# export CDN_LOG_QUEUE_URL=
53+
# export CDN_LOG_QUEUE_REGION=
54+
4855
# Upstream location of the registry index. Background jobs will push to
4956
# this URL. The default points to a local index for development.
5057
# Run `./script/init-local-index.sh` to initialize this repo.

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ async-trait = "=0.1.77"
5252
aws-credential-types = { version = "=1.1.4", features = ["hardcoded-credentials"] }
5353
aws-ip-ranges = "=0.90.0"
5454
aws-sdk-cloudfront = "=1.12.0"
55+
aws-sdk-sqs = "=1.12.0"
5556
axum = { version = "=0.7.4", features = ["macros", "matched-path"] }
5657
axum-extra = { version = "=0.9.2", features = ["cookie-signed", "typed-header"] }
5758
base64 = "=0.21.7"
5859
bigdecimal = "=0.4.2"
5960
cargo-manifest = "=0.13.0"
61+
crates_io_cdn_logs = { path = "crates_io_cdn_logs" }
6062
crates_io_env_vars = { path = "crates_io_env_vars" }
6163
crates_io_github = { path = "crates_io_github" }
6264
crates_io_index = { path = "crates_io_index" }

src/admin/enqueue_job.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub enum Command {
3131
#[arg()]
3232
name: String,
3333
},
34+
ProcessCdnLogQueue(jobs::ProcessCdnLogQueue),
3435
SyncAdmins {
3536
/// Force a sync even if one is already in progress
3637
#[arg(long)]
@@ -89,6 +90,9 @@ pub fn run(command: Command) -> Result<()> {
8990
Command::DailyDbMaintenance => {
9091
jobs::DailyDbMaintenance.enqueue(conn)?;
9192
}
93+
Command::ProcessCdnLogQueue(job) => {
94+
job.enqueue(conn)?;
95+
}
9296
Command::SquashIndex => {
9397
jobs::SquashIndex.enqueue(conn)?;
9498
}

src/app.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct App {
3434
pub github_oauth: BasicClient,
3535

3636
/// The server configuration
37-
pub config: config::Server,
37+
pub config: Arc<config::Server>,
3838

3939
/// Cache the `version_id` of a `canonical_crate_name:semver` pair
4040
///
@@ -158,7 +158,7 @@ impl App {
158158
instance_metrics,
159159
balance_capacity: Default::default(),
160160
rate_limiter: RateLimiter::new(config.rate_limiter.clone()),
161-
config,
161+
config: Arc::new(config),
162162
}
163163
}
164164

src/bin/background-worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ fn main() -> anyhow::Result<()> {
8686
.build_unchecked(ConnectionManager::new(db_url));
8787

8888
let environment = Environment::builder()
89+
.config(Arc::new(config))
8990
.repository_config(repository_config)
9091
.cloudfront(cloudfront)
9192
.fastly(fastly)

src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
mod balance_capacity;
22
mod base;
3+
mod cdn_log_queue;
4+
mod cdn_log_storage;
35
mod database_pools;
46
mod sentry;
57
mod server;
68

79
pub use self::balance_capacity::BalanceCapacityConfig;
810
pub use self::base::Base;
11+
pub use self::cdn_log_queue::CdnLogQueueConfig;
12+
pub use self::cdn_log_storage::CdnLogStorageConfig;
913
pub use self::database_pools::{DatabasePools, DbPoolConfig};
1014
pub use self::sentry::SentryConfig;
1115
pub use self::server::Server;

src/config/cdn_log_queue.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use crates_io_env_vars::{required_var, var};
2+
use secrecy::SecretString;
3+
4+
#[derive(Debug, Clone)]
5+
pub enum CdnLogQueueConfig {
6+
SQS {
7+
access_key: String,
8+
secret_key: SecretString,
9+
queue_url: String,
10+
region: String,
11+
},
12+
Mock,
13+
}
14+
15+
impl CdnLogQueueConfig {
16+
pub fn from_env() -> anyhow::Result<Self> {
17+
if let Some(queue_url) = var("CDN_LOG_QUEUE_URL")? {
18+
let access_key = required_var("CDN_LOG_QUEUE_ACCESS_KEY")?;
19+
let secret_key = required_var("CDN_LOG_QUEUE_SECRET_KEY")?.into();
20+
let region = required_var("CDN_LOG_QUEUE_REGION")?;
21+
22+
return Ok(Self::SQS {
23+
access_key,
24+
secret_key,
25+
queue_url,
26+
region,
27+
});
28+
}
29+
30+
warn!("Falling back to mocked CDN log queue");
31+
Ok(Self::Mock)
32+
}
33+
}

src/config/cdn_log_storage.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use anyhow::Context;
2+
use crates_io_env_vars::{required_var, var};
3+
use secrecy::SecretString;
4+
use std::path::PathBuf;
5+
6+
#[derive(Debug, Clone)]
7+
pub enum CdnLogStorageConfig {
8+
S3 {
9+
access_key: String,
10+
secret_key: SecretString,
11+
},
12+
Local {
13+
path: PathBuf,
14+
},
15+
Memory,
16+
}
17+
18+
impl CdnLogStorageConfig {
19+
pub fn s3(access_key: String, secret_key: SecretString) -> Self {
20+
Self::S3 {
21+
access_key,
22+
secret_key,
23+
}
24+
}
25+
26+
pub fn local(path: PathBuf) -> Self {
27+
Self::Local { path }
28+
}
29+
30+
pub fn memory() -> Self {
31+
Self::Memory
32+
}
33+
34+
pub fn from_env() -> anyhow::Result<Self> {
35+
if let Some(access_key) = var("AWS_ACCESS_KEY")? {
36+
let secret_key = required_var("AWS_SECRET_KEY")?.into();
37+
return Ok(Self::s3(access_key, secret_key));
38+
}
39+
40+
let current_dir = std::env::current_dir();
41+
let current_dir = current_dir.context("Failed to read the current directory")?;
42+
43+
let path = current_dir.join("local_uploads");
44+
let path_display = path.display();
45+
if path.exists() {
46+
info!("Falling back to local CDN log storage at {path_display}");
47+
return Ok(Self::local(path));
48+
}
49+
50+
warn!("Falling back to in-memory CDN log storage because {path_display} does not exist");
51+
Ok(Self::memory())
52+
}
53+
}

src/config/server.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use crate::Env;
88
use super::base::Base;
99
use super::database_pools::DatabasePools;
1010
use crate::config::balance_capacity::BalanceCapacityConfig;
11+
use crate::config::cdn_log_storage::CdnLogStorageConfig;
12+
use crate::config::CdnLogQueueConfig;
1113
use crate::middleware::cargo_compat::StatusCodeConfig;
1214
use crate::storage::StorageConfig;
1315
use crates_io_env_vars::{list, list_parsed, required_var, var, var_parsed};
@@ -34,6 +36,8 @@ pub struct Server {
3436
pub max_blocking_threads: Option<usize>,
3537
pub db: DatabasePools,
3638
pub storage: StorageConfig,
39+
pub cdn_log_storage: CdnLogStorageConfig,
40+
pub cdn_log_queue: CdnLogQueueConfig,
3741
pub session_key: cookie::Key,
3842
pub gh_client_id: ClientId,
3943
pub gh_client_secret: ClientSecret,
@@ -172,6 +176,8 @@ impl Server {
172176
Ok(Server {
173177
db: DatabasePools::full_from_environment(&base)?,
174178
storage,
179+
cdn_log_storage: CdnLogStorageConfig::from_env()?,
180+
cdn_log_queue: CdnLogQueueConfig::from_env()?,
175181
base,
176182
ip,
177183
port,

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ mod router;
5353
pub mod schema;
5454
pub mod sentry;
5555
pub mod sql;
56+
pub mod sqs;
5657
pub mod ssh;
5758
pub mod storage;
5859
pub mod tasks;

src/sqs.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use anyhow::Context;
2+
use async_trait::async_trait;
3+
use aws_credential_types::Credentials;
4+
use aws_sdk_sqs::config::{BehaviorVersion, Region};
5+
use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput;
6+
use mockall::automock;
7+
8+
/// The [SqsQueue] trait defines a basic interface for interacting with an
9+
/// AWS SQS queue.
10+
///
11+
/// A [MockSqsQueue] struct is automatically generated by the [automock]
12+
/// attribute. This struct can be used in unit tests to mock the behavior of
13+
/// the [SqsQueue] trait.
14+
///
15+
/// The [SqsQueueImpl] struct is the actual implementation of the trait.
16+
#[automock]
17+
#[async_trait]
18+
pub trait SqsQueue {
19+
async fn receive_messages(&self, max_messages: i32) -> anyhow::Result<ReceiveMessageOutput>;
20+
async fn delete_message(&self, receipt_handle: &str) -> anyhow::Result<()>;
21+
}
22+
23+
/// The [SqsQueueImpl] struct is the actual implementation of the [SqsQueue]
24+
/// trait, which interacts with the real AWS API servers.
25+
#[derive(Debug, Clone)]
26+
pub struct SqsQueueImpl {
27+
client: aws_sdk_sqs::Client,
28+
queue_url: String,
29+
}
30+
31+
impl SqsQueueImpl {
32+
pub fn new(queue_url: impl Into<String>, region: Region, credentials: Credentials) -> Self {
33+
let config = aws_sdk_sqs::Config::builder()
34+
.credentials_provider(credentials)
35+
.region(region)
36+
.behavior_version(BehaviorVersion::v2023_11_09())
37+
.build();
38+
39+
let client = aws_sdk_sqs::Client::from_conf(config);
40+
let queue_url = queue_url.into();
41+
42+
SqsQueueImpl { client, queue_url }
43+
}
44+
}
45+
46+
#[async_trait]
47+
impl SqsQueue for SqsQueueImpl {
48+
async fn receive_messages(&self, max_messages: i32) -> anyhow::Result<ReceiveMessageOutput> {
49+
let response = self
50+
.client
51+
.receive_message()
52+
.max_number_of_messages(max_messages)
53+
.queue_url(&self.queue_url)
54+
.send()
55+
.await
56+
.context("Failed to receive SQS queue message")?;
57+
58+
Ok(response)
59+
}
60+
61+
async fn delete_message(&self, receipt_handle: &str) -> anyhow::Result<()> {
62+
self.client
63+
.delete_message()
64+
.receipt_handle(receipt_handle)
65+
.queue_url(&self.queue_url)
66+
.send()
67+
.await
68+
.context("Failed to delete SQS queue message")?;
69+
70+
Ok(())
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::*;
77+
78+
#[test]
79+
fn test_constructor() {
80+
let credentials = Credentials::new(
81+
"ANOTREAL",
82+
"notrealrnrELgWzOk3IfjzDKtFBhDby",
83+
None,
84+
None,
85+
"test",
86+
);
87+
88+
let queue_url = "https://sqs.us-west-1.amazonaws.com/359172468976/cdn-log-event-queue";
89+
let region = Region::new("us-west-1");
90+
91+
// Check that `SqsQueueImpl::new()` does not panic.
92+
let _queue = SqsQueueImpl::new(queue_url, region, credentials);
93+
}
94+
}

src/tests/util/test_app.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use super::{MockAnonymousUser, MockCookieUser, MockTokenUser};
22
use crate::util::chaosproxy::ChaosProxy;
33
use crate::util::github::{MockGitHubClient, MOCK_GITHUB_DATA};
44
use anyhow::Context;
5-
use crates_io::config::{self, BalanceCapacityConfig, Base, DatabasePools, DbPoolConfig};
5+
use crates_io::config::{
6+
self, BalanceCapacityConfig, Base, CdnLogQueueConfig, CdnLogStorageConfig, DatabasePools,
7+
DbPoolConfig,
8+
};
69
use crates_io::middleware::cargo_compat::StatusCodeConfig;
710
use crates_io::models::token::{CrateScope, EndpointScope};
811
use crates_io::rate_limiter::{LimitedAction, RateLimiterConfig};
@@ -270,6 +273,7 @@ impl TestAppBuilder {
270273
};
271274

272275
let environment = Environment::builder()
276+
.config(app.config.clone())
273277
.repository_config(repository_config)
274278
.storage(app.storage.clone())
275279
.connection_pool(app.primary_database.clone())
@@ -421,6 +425,8 @@ fn simple_config() -> config::Server {
421425
max_blocking_threads: None,
422426
db,
423427
storage,
428+
cdn_log_queue: CdnLogQueueConfig::Mock,
429+
cdn_log_storage: CdnLogStorageConfig::memory(),
424430
session_key: cookie::Key::derive_from("test this has to be over 32 bytes long".as_bytes()),
425431
gh_client_id: ClientId::new(dotenvy::var("GH_CLIENT_ID").unwrap_or_default()),
426432
gh_client_secret: ClientSecret::new(dotenvy::var("GH_CLIENT_SECRET").unwrap_or_default()),

src/worker/environment.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use std::time::Instant;
1616
#[derive(Builder)]
1717
#[builder(pattern = "owned")]
1818
pub struct Environment {
19+
pub config: Arc<crate::config::Server>,
20+
1921
repository_config: RepositoryConfig,
2022
#[builder(default, setter(skip))]
2123
repository: Mutex<Option<Repository>>,

0 commit comments

Comments
 (0)