Skip to content

Commit 0fb7050

Browse files
committed
Implement ProcessCdnLogQueue background job
1 parent e3bcd00 commit 0fb7050

14 files changed

+405
-1
lines changed

.env.sample

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ export TEST_DATABASE_URL=
4545
# Uses AWS credentials.
4646
# export CLOUDFRONT_DISTRIBUTION=
4747

48+
49+
# Configuration for the CDN log queue. You can leave these commented out if
50+
# you're not using the CDN log queue.
51+
# export CDN_LOG_QUEUE_ACCESS_KEY=
52+
# export CDN_LOG_QUEUE_SECRET_KEY=
53+
# export CDN_LOG_QUEUE_URL=
54+
# export CDN_LOG_QUEUE_REGION=
55+
4856
# Upstream location of the registry index. Background jobs will push to
4957
# this URL. The default points to a local index for development.
5058
# Run `./script/init-local-index.sh` to initialize this repo.

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async-trait = "=0.1.77"
5252
aws-credential-types = { version = "=1.1.4", features = ["hardcoded-credentials"] }
5353
aws-ip-ranges = "=0.90.0"
5454
aws-sdk-cloudfront = "=1.12.0"
55+
aws-sdk-sqs = "=1.12.0"
5556
axum = { version = "=0.7.4", features = ["macros", "matched-path"] }
5657
axum-extra = { version = "=0.9.2", features = ["cookie-signed", "typed-header"] }
5758
base64 = "=0.21.7"

src/admin/enqueue_job.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub enum Command {
3131
#[arg()]
3232
name: String,
3333
},
34+
ProcessCdnLogQueue(jobs::ProcessCdnLogQueue),
3435
SyncAdmins {
3536
/// Force a sync even if one is already in progress
3637
#[arg(long)]
@@ -89,6 +90,9 @@ pub fn run(command: Command) -> Result<()> {
8990
Command::DailyDbMaintenance => {
9091
jobs::DailyDbMaintenance.enqueue(conn)?;
9192
}
93+
Command::ProcessCdnLogQueue(job) => {
94+
job.enqueue(conn)?;
95+
}
9296
Command::SquashIndex => {
9397
jobs::SquashIndex.enqueue(conn)?;
9498
}

src/worker/jobs/downloads.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
mod message;
2+
13
use crate::config::CdnLogStorageConfig;
4+
use crate::tasks::spawn_blocking;
25
use crate::worker::Environment;
36
use anyhow::Context;
7+
use aws_credential_types::Credentials;
8+
use aws_sdk_sqs::config::{BehaviorVersion, Region};
49
use crates_io_cdn_logs::{count_downloads, Decompressor};
10+
use crates_io_env_vars::required_var;
511
use crates_io_worker::BackgroundJob;
612
use object_store::aws::AmazonS3Builder;
713
use object_store::local::LocalFileSystem;
@@ -130,6 +136,142 @@ impl ProcessCdnLog {
130136
}
131137
}
132138

139+
#[derive(Debug, Serialize, Deserialize, clap::Parser)]
140+
pub struct ProcessCdnLogQueue {
141+
/// The maximum number of messages to receive from the queue and process.
142+
#[clap(long, default_value = "1")]
143+
max_messages: usize,
144+
}
145+
146+
impl BackgroundJob for ProcessCdnLogQueue {
147+
const JOB_NAME: &'static str = "process_cdn_log_queue";
148+
149+
type Context = Arc<Environment>;
150+
151+
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
152+
const MAX_BATCH_SIZE: usize = 10;
153+
154+
let access_key = required_var("CDN_LOG_QUEUE_ACCESS_KEY")?;
155+
let secret_key = required_var("CDN_LOG_QUEUE_SECRET_KEY")?;
156+
let queue_url = required_var("CDN_LOG_QUEUE_URL")?;
157+
let region = required_var("CDN_LOG_QUEUE_REGION")?;
158+
159+
let credentials = Credentials::from_keys(access_key, secret_key, None);
160+
161+
let config = aws_sdk_sqs::Config::builder()
162+
.credentials_provider(credentials)
163+
.region(Region::new(region))
164+
.behavior_version(BehaviorVersion::v2023_11_09())
165+
.build();
166+
167+
let client = aws_sdk_sqs::Client::from_conf(config);
168+
169+
info!("Receiving messages from the CDN log queue…");
170+
let mut num_remaining = self.max_messages;
171+
while num_remaining > 0 {
172+
let batch_size = num_remaining.min(MAX_BATCH_SIZE);
173+
num_remaining -= batch_size;
174+
175+
debug!("Receiving next {batch_size} messages from the CDN log queue…");
176+
let response = client
177+
.receive_message()
178+
.queue_url(&queue_url)
179+
.max_number_of_messages(batch_size as i32)
180+
.send()
181+
.await?;
182+
183+
let messages = response.messages();
184+
debug!(
185+
"Received {num_messages} messages from the CDN log queue",
186+
num_messages = messages.len()
187+
);
188+
if messages.is_empty() {
189+
info!("No more messages to receive from the CDN log queue");
190+
break;
191+
}
192+
193+
for message in messages {
194+
let message_id = message.message_id().unwrap_or("<unknown>");
195+
debug!("Processing message: {message_id}");
196+
197+
let Some(receipt_handle) = message.receipt_handle() else {
198+
warn!("Message {message_id} has no receipt handle; skipping");
199+
continue;
200+
};
201+
202+
debug!("Deleting message {message_id} from the CDN log queue…");
203+
client
204+
.delete_message()
205+
.queue_url(&queue_url)
206+
.receipt_handle(receipt_handle)
207+
.send()
208+
.await
209+
.with_context(|| {
210+
format!("Failed to delete message {message_id} from the CDN log queue")
211+
})?;
212+
213+
let Some(body) = message.body() else {
214+
warn!("Message {message_id} has no body; skipping");
215+
continue;
216+
};
217+
218+
let message = match serde_json::from_str::<message::Message>(body) {
219+
Ok(message) => message,
220+
Err(err) => {
221+
warn!("Failed to parse message {message_id}: {err}");
222+
continue;
223+
}
224+
};
225+
226+
if message.records.is_empty() {
227+
warn!("Message {message_id} has no records; skipping");
228+
continue;
229+
}
230+
231+
let pool = ctx.connection_pool.clone();
232+
spawn_blocking({
233+
let message_id = message_id.to_owned();
234+
move || {
235+
let mut conn = pool
236+
.get()
237+
.context("Failed to acquire database connection")?;
238+
239+
for record in message.records {
240+
let region = record.aws_region;
241+
let bucket = record.s3.bucket.name;
242+
let path = record.s3.object.key;
243+
244+
let path = match object_store::path::Path::from_url_path(&path) {
245+
Ok(path) => path,
246+
Err(err) => {
247+
warn!("Failed to parse path ({path}): {err}");
248+
continue;
249+
}
250+
};
251+
252+
info!("Enqueuing processing job for message {message_id}… ({path})");
253+
let job = ProcessCdnLog::new(region, bucket, path.as_ref().to_owned());
254+
255+
job.enqueue(&mut conn).with_context(|| {
256+
format!("Failed to enqueue processing job for message {message_id}")
257+
})?;
258+
259+
debug!("Enqueued processing job for message {message_id}");
260+
}
261+
262+
Ok::<_, anyhow::Error>(())
263+
}
264+
})
265+
.await?;
266+
267+
debug!("Processed message: {message_id}");
268+
}
269+
}
270+
271+
Ok(())
272+
}
273+
}
274+
133275
#[cfg(test)]
134276
mod tests {
135277
use super::*;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"Records": []
3+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
{
2+
"Records": [
3+
{
4+
"eventVersion": "2.1",
5+
"eventSource": "aws:s3",
6+
"awsRegion": "us-west-1",
7+
"eventTime": "2024-01-16T16:24:06.554Z",
8+
"eventName": "ObjectCreated:Put",
9+
"userIdentity": {
10+
"principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal"
11+
},
12+
"requestParameters": {
13+
"sourceIPAddress": "72.21.217.31"
14+
},
15+
"responseElements": {
16+
"x-amz-request-id": "WK0MBXBCXJ4Y1KDX",
17+
"x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex"
18+
},
19+
"s3": {
20+
"s3SchemaVersion": "1.0",
21+
"configurationId": "cloudfront",
22+
"bucket": {
23+
"name": "rust-staging-crates-io-logs",
24+
"ownerIdentity": {
25+
"principalId": "A266IOH8VA1FN7"
26+
},
27+
"arn": "arn:aws:s3:::rust-staging-crates-io-logs"
28+
},
29+
"object": {
30+
"key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz",
31+
"size": 710,
32+
"eTag": "4c8266e632a8f1ed489c19821680da6e",
33+
"sequencer": "0065A6ADA6765BF0F0"
34+
}
35+
}
36+
},
37+
{
38+
"eventVersion": "2.1",
39+
"eventSource": "aws:s3",
40+
"awsRegion": "us-west-1",
41+
"eventTime": "2024-01-16T16:24:06.554Z",
42+
"eventName": "ObjectCreated:Put",
43+
"userIdentity": {
44+
"principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal"
45+
},
46+
"requestParameters": {
47+
"sourceIPAddress": "72.21.217.31"
48+
},
49+
"responseElements": {
50+
"x-amz-request-id": "WK0MBXBCXJ4Y1KDX",
51+
"x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex"
52+
},
53+
"s3": {
54+
"s3SchemaVersion": "1.0",
55+
"configurationId": "cloudfront",
56+
"bucket": {
57+
"name": "rust-staging-crates-io-logs",
58+
"ownerIdentity": {
59+
"principalId": "A266IOH8VA1FN7"
60+
},
61+
"arn": "arn:aws:s3:::rust-staging-crates-io-logs"
62+
},
63+
"object": {
64+
"key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz",
65+
"size": 710,
66+
"eTag": "4c8266e632a8f1ed489c19821680da6e",
67+
"sequencer": "0065A6ADA6765BF0F0"
68+
}
69+
}
70+
}
71+
]
72+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"Records": [
3+
{
4+
"eventVersion": "2.1",
5+
"eventSource": "aws:s3",
6+
"awsRegion": "us-west-1",
7+
"eventTime": "2024-01-16T16:24:06.554Z",
8+
"eventName": "ObjectCreated:Put",
9+
"userIdentity": {
10+
"principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal"
11+
},
12+
"requestParameters": {
13+
"sourceIPAddress": "72.21.217.31"
14+
},
15+
"responseElements": {
16+
"x-amz-request-id": "WK0MBXBCXJ4Y1KDX",
17+
"x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex"
18+
},
19+
"s3": {
20+
"s3SchemaVersion": "1.0",
21+
"configurationId": "cloudfront",
22+
"bucket": {
23+
"name": "rust-staging-crates-io-logs",
24+
"ownerIdentity": {
25+
"principalId": "A266IOH8VA1FN7"
26+
},
27+
"arn": "arn:aws:s3:::rust-staging-crates-io-logs"
28+
},
29+
"object": {
30+
"key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz",
31+
"size": 710,
32+
"eTag": "4c8266e632a8f1ed489c19821680da6e",
33+
"sequencer": "0065A6ADA6765BF0F0"
34+
}
35+
}
36+
}
37+
]
38+
}

0 commit comments

Comments
 (0)