Skip to content

Commit 58deb25

Browse files
committed
Implement ProcessCdnLogQueue background job
1 parent add51a9 commit 58deb25

13 files changed

+396
-1
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async-trait = "=0.1.77"
5252
aws-credential-types = { version = "=1.1.4", features = ["hardcoded-credentials"] }
5353
aws-ip-ranges = "=0.89.0"
5454
aws-sdk-cloudfront = "=1.12.0"
55+
aws-sdk-sqs = "=1.12.0"
5556
axum = { version = "=0.7.4", features = ["macros", "matched-path"] }
5657
axum-extra = { version = "=0.9.2", features = ["cookie-signed", "typed-header"] }
5758
base64 = "=0.21.7"

src/admin/enqueue_job.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub enum Command {
3131
#[arg()]
3232
name: String,
3333
},
34+
ProcessCdnLogQueue(jobs::ProcessCdnLogQueue),
3435
SyncAdmins {
3536
/// Force a sync even if one is already in progress
3637
#[arg(long)]
@@ -89,6 +90,9 @@ pub fn run(command: Command) -> Result<()> {
8990
Command::DailyDbMaintenance => {
9091
jobs::DailyDbMaintenance.enqueue(conn)?;
9192
}
93+
Command::ProcessCdnLogQueue(job) => {
94+
job.enqueue(conn)?;
95+
}
9296
Command::SquashIndex => {
9397
jobs::SquashIndex.enqueue(conn)?;
9498
}

src/worker/jobs/downloads.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
mod message;
2+
3+
use crate::tasks::spawn_blocking;
14
use crate::worker::Environment;
25
use anyhow::Context;
6+
use aws_credential_types::Credentials;
7+
use aws_sdk_sqs::config::{BehaviorVersion, Region};
38
use crates_io_cdn_logs::{count_downloads, Decompressor};
49
use crates_io_env_vars::required_var;
510
use crates_io_worker::BackgroundJob;
@@ -104,3 +109,139 @@ impl BackgroundJob for ProcessCdnLog {
104109
Ok(())
105110
}
106111
}
112+
113+
#[derive(Debug, Serialize, Deserialize, clap::Parser)]
114+
pub struct ProcessCdnLogQueue {
115+
/// The maximum number of messages to receive from the queue and process.
116+
#[clap(long, default_value = "1")]
117+
max_messages: usize,
118+
}
119+
120+
impl BackgroundJob for ProcessCdnLogQueue {
121+
const JOB_NAME: &'static str = "process_cdn_log_queue";
122+
123+
type Context = Arc<Environment>;
124+
125+
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
126+
const MAX_BATCH_SIZE: usize = 10;
127+
128+
let queue_url = required_var("CDN_LOG_QUEUE_URL")?;
129+
let region = required_var("CDN_LOG_QUEUE_REGION")?;
130+
let access_key = required_var("CDN_LOG_QUEUE_ACCESS_KEY")?;
131+
let secret_key = required_var("CDN_LOG_QUEUE_SECRET_KEY")?;
132+
133+
let credentials = Credentials::from_keys(access_key, secret_key, None);
134+
135+
let config = aws_sdk_sqs::Config::builder()
136+
.credentials_provider(credentials)
137+
.region(Region::new(region))
138+
.behavior_version(BehaviorVersion::v2023_11_09())
139+
.build();
140+
141+
let client = aws_sdk_sqs::Client::from_conf(config);
142+
143+
info!("Receiving messages from the CDN log queue…");
144+
let mut num_remaining = self.max_messages;
145+
while num_remaining > 0 {
146+
let batch_size = num_remaining.min(MAX_BATCH_SIZE);
147+
num_remaining -= batch_size;
148+
149+
debug!("Receiving next {batch_size} messages from the CDN log queue…");
150+
let response = client
151+
.receive_message()
152+
.queue_url(&queue_url)
153+
.max_number_of_messages(batch_size as i32)
154+
.send()
155+
.await?;
156+
157+
let messages = response.messages();
158+
debug!(
159+
"Received {num_messages} messages from the CDN log queue",
160+
num_messages = messages.len()
161+
);
162+
if messages.is_empty() {
163+
info!("No more messages to receive from the CDN log queue");
164+
break;
165+
}
166+
167+
for message in messages {
168+
let message_id = message.message_id().unwrap_or("<unknown>");
169+
debug!("Processing message: {message_id}");
170+
171+
let Some(receipt_handle) = message.receipt_handle() else {
172+
warn!("Message {message_id} has no receipt handle; skipping");
173+
continue;
174+
};
175+
176+
debug!("Deleting message {message_id} from the CDN log queue…");
177+
client
178+
.delete_message()
179+
.queue_url(&queue_url)
180+
.receipt_handle(receipt_handle)
181+
.send()
182+
.await
183+
.with_context(|| {
184+
format!("Failed to delete message {message_id} from the CDN log queue")
185+
})?;
186+
187+
let Some(body) = message.body() else {
188+
warn!("Message {message_id} has no body; skipping");
189+
continue;
190+
};
191+
192+
let message = match serde_json::from_str::<message::Message>(body) {
193+
Ok(message) => message,
194+
Err(err) => {
195+
warn!("Failed to parse message {message_id}: {err}");
196+
continue;
197+
}
198+
};
199+
200+
if message.records.is_empty() {
201+
warn!("Message {message_id} has no records; skipping");
202+
continue;
203+
}
204+
205+
let pool = ctx.connection_pool.clone();
206+
spawn_blocking({
207+
let message_id = message_id.to_owned();
208+
move || {
209+
let mut conn = pool
210+
.get()
211+
.context("Failed to acquire database connection")?;
212+
213+
for record in message.records {
214+
let region = record.aws_region;
215+
let bucket = record.s3.bucket.name;
216+
let path = record.s3.object.key;
217+
218+
let path = match object_store::path::Path::from_url_path(&path) {
219+
Ok(path) => path,
220+
Err(err) => {
221+
warn!("Failed to parse path ({path}): {err}");
222+
continue;
223+
}
224+
};
225+
226+
info!("Enqueuing processing job for message {message_id}… ({path})");
227+
let job = ProcessCdnLog::new(region, bucket, path.as_ref().to_owned());
228+
229+
job.enqueue(&mut conn).with_context(|| {
230+
format!("Failed to enqueue processing job for message {message_id}")
231+
})?;
232+
233+
debug!("Enqueued processing job for message {message_id}");
234+
}
235+
236+
Ok::<_, anyhow::Error>(())
237+
}
238+
})
239+
.await?;
240+
241+
debug!("Processed message: {message_id}");
242+
}
243+
}
244+
245+
Ok(())
246+
}
247+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"Records": []
3+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
{
2+
"Records": [
3+
{
4+
"eventVersion": "2.1",
5+
"eventSource": "aws:s3",
6+
"awsRegion": "us-west-1",
7+
"eventTime": "2024-01-16T16:24:06.554Z",
8+
"eventName": "ObjectCreated:Put",
9+
"userIdentity": {
10+
"principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal"
11+
},
12+
"requestParameters": {
13+
"sourceIPAddress": "72.21.217.31"
14+
},
15+
"responseElements": {
16+
"x-amz-request-id": "WK0MBXBCXJ4Y1KDX",
17+
"x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex"
18+
},
19+
"s3": {
20+
"s3SchemaVersion": "1.0",
21+
"configurationId": "cloudfront",
22+
"bucket": {
23+
"name": "rust-staging-crates-io-logs",
24+
"ownerIdentity": {
25+
"principalId": "A266IOH8VA1FN7"
26+
},
27+
"arn": "arn:aws:s3:::rust-staging-crates-io-logs"
28+
},
29+
"object": {
30+
"key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz",
31+
"size": 710,
32+
"eTag": "4c8266e632a8f1ed489c19821680da6e",
33+
"sequencer": "0065A6ADA6765BF0F0"
34+
}
35+
}
36+
},
37+
{
38+
"eventVersion": "2.1",
39+
"eventSource": "aws:s3",
40+
"awsRegion": "us-west-1",
41+
"eventTime": "2024-01-16T16:24:06.554Z",
42+
"eventName": "ObjectCreated:Put",
43+
"userIdentity": {
44+
"principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal"
45+
},
46+
"requestParameters": {
47+
"sourceIPAddress": "72.21.217.31"
48+
},
49+
"responseElements": {
50+
"x-amz-request-id": "WK0MBXBCXJ4Y1KDX",
51+
"x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex"
52+
},
53+
"s3": {
54+
"s3SchemaVersion": "1.0",
55+
"configurationId": "cloudfront",
56+
"bucket": {
57+
"name": "rust-staging-crates-io-logs",
58+
"ownerIdentity": {
59+
"principalId": "A266IOH8VA1FN7"
60+
},
61+
"arn": "arn:aws:s3:::rust-staging-crates-io-logs"
62+
},
63+
"object": {
64+
"key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz",
65+
"size": 710,
66+
"eTag": "4c8266e632a8f1ed489c19821680da6e",
67+
"sequencer": "0065A6ADA6765BF0F0"
68+
}
69+
}
70+
}
71+
]
72+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"Records": [
3+
{
4+
"eventVersion": "2.1",
5+
"eventSource": "aws:s3",
6+
"awsRegion": "us-west-1",
7+
"eventTime": "2024-01-16T16:24:06.554Z",
8+
"eventName": "ObjectCreated:Put",
9+
"userIdentity": {
10+
"principalId": "AWS:AROASLZSKMR5WNLFCYRXL:prod.iad.dbs.datafeeds.aws.internal"
11+
},
12+
"requestParameters": {
13+
"sourceIPAddress": "72.21.217.31"
14+
},
15+
"responseElements": {
16+
"x-amz-request-id": "WK0MBXBCXJ4Y1KDX",
17+
"x-amz-id-2": "Rwojcx6M4ZoUCE8gETkqTmOPO2zKbF7AFeZshk1/nYxdtng47pBPRwsSxbscaQTZu2PVbT9w+u9LTq2KVsoleencxqJ3vCex"
18+
},
19+
"s3": {
20+
"s3SchemaVersion": "1.0",
21+
"configurationId": "cloudfront",
22+
"bucket": {
23+
"name": "rust-staging-crates-io-logs",
24+
"ownerIdentity": {
25+
"principalId": "A266IOH8VA1FN7"
26+
},
27+
"arn": "arn:aws:s3:::rust-staging-crates-io-logs"
28+
},
29+
"object": {
30+
"key": "cloudfront/index.staging.crates.io/E35K556QRQDZXW.2024-01-16-16.d01d5f13.gz",
31+
"size": 710,
32+
"eTag": "4c8266e632a8f1ed489c19821680da6e",
33+
"sequencer": "0065A6ADA6765BF0F0"
34+
}
35+
}
36+
}
37+
]
38+
}

src/worker/jobs/downloads/message.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use std::str::FromStr;
2+
3+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
4+
pub struct Object {
5+
pub key: String,
6+
}
7+
8+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
9+
pub struct Bucket {
10+
pub name: String,
11+
}
12+
13+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
14+
pub struct S3 {
15+
pub bucket: Bucket,
16+
pub object: Object,
17+
}
18+
19+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
20+
pub struct Record {
21+
#[serde(rename = "awsRegion")]
22+
pub aws_region: String,
23+
pub s3: S3,
24+
}
25+
26+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
27+
pub struct Message {
28+
#[serde(rename = "Records")]
29+
pub records: Vec<Record>,
30+
}
31+
32+
impl FromStr for Message {
33+
type Err = serde_json::Error;
34+
35+
fn from_str(s: &str) -> Result<Self, Self::Err> {
36+
serde_json::from_str(s)
37+
}
38+
}
39+
40+
#[cfg(test)]
41+
mod tests {
42+
use super::*;
43+
use insta::assert_debug_snapshot;
44+
45+
#[test]
46+
fn test_parse_lowlevel_event() {
47+
let event = assert_ok!(include_str!("./fixtures/empty-event.json").parse::<Message>());
48+
assert_debug_snapshot!(event);
49+
50+
let event = assert_ok!(include_str!("./fixtures/valid-event.json").parse::<Message>());
51+
assert_debug_snapshot!(event);
52+
53+
let event = assert_ok!(include_str!("./fixtures/multi-event.json").parse::<Message>());
54+
assert_debug_snapshot!(event);
55+
}
56+
}

0 commit comments

Comments
 (0)