Skip to content

Commit add51a9

Browse files
committed
Implement ProcessCdnLog background job
1 parent b5d12aa commit add51a9

File tree

5 files changed

+111
-0
lines changed

5 files changed

+111
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ axum-extra = { version = "=0.9.2", features = ["cookie-signed", "typed-header"]
5757
base64 = "=0.21.7"
5858
bigdecimal = "=0.4.2"
5959
cargo-manifest = "=0.13.0"
60+
crates_io_cdn_logs = { path = "crates_io_cdn_logs" }
6061
crates_io_env_vars = { path = "crates_io_env_vars" }
6162
crates_io_github = { path = "crates_io_github" }
6263
crates_io_index = { path = "crates_io_index" }

src/worker/jobs/downloads.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use crate::worker::Environment;
2+
use anyhow::Context;
3+
use crates_io_cdn_logs::{count_downloads, Decompressor};
4+
use crates_io_env_vars::required_var;
5+
use crates_io_worker::BackgroundJob;
6+
use object_store::aws::AmazonS3Builder;
7+
use object_store::ObjectStore;
8+
use std::cmp::Reverse;
9+
use std::collections::HashSet;
10+
use std::sync::Arc;
11+
use std::time::SystemTime;
12+
use tokio::io::BufReader;
13+
14+
#[derive(Debug, Serialize, Deserialize)]
15+
pub struct ProcessCdnLog {
16+
region: String,
17+
bucket: String,
18+
path: String,
19+
}
20+
21+
impl ProcessCdnLog {
22+
pub fn new(region: String, bucket: String, path: String) -> Self {
23+
Self {
24+
region,
25+
bucket,
26+
path,
27+
}
28+
}
29+
}
30+
31+
impl BackgroundJob for ProcessCdnLog {
32+
const JOB_NAME: &'static str = "process_cdn_log";
33+
34+
type Context = Arc<Environment>;
35+
36+
async fn run(&self, _ctx: Self::Context) -> anyhow::Result<()> {
37+
let access_key = required_var("AWS_ACCESS_KEY")?;
38+
let secret_key = required_var("AWS_SECRET_KEY")?;
39+
40+
let store = AmazonS3Builder::new()
41+
.with_region(&self.region)
42+
.with_bucket_name(&self.bucket)
43+
.with_access_key_id(access_key)
44+
.with_secret_access_key(secret_key)
45+
.build()
46+
.context("Failed to build object store")?;
47+
48+
let path = object_store::path::Path::parse(&self.path)
49+
.with_context(|| format!("Failed to parse path: {:?}", self.path))?;
50+
51+
let meta = store.head(&path).await;
52+
let meta = meta.with_context(|| format!("Failed to request metadata for {path:?}"))?;
53+
54+
let reader = object_store::buffered::BufReader::new(Arc::new(store), &meta);
55+
let decompressor = Decompressor::from_extension(reader, path.extension())?;
56+
let reader = BufReader::new(decompressor);
57+
58+
let parse_start = SystemTime::now();
59+
let downloads = count_downloads(reader).await?;
60+
let parse_duration = parse_start.elapsed()?;
61+
62+
// TODO: for now this background job just prints out the results, but
63+
// eventually it should insert them into the database instead.
64+
65+
if downloads.as_inner().is_empty() {
66+
info!("No downloads found in log file: {path}");
67+
return Ok(());
68+
}
69+
70+
let num_crates = downloads
71+
.as_inner()
72+
.iter()
73+
.map(|((_, krate, _), _)| krate)
74+
.collect::<HashSet<_>>()
75+
.len();
76+
77+
let total_inserts = downloads.as_inner().len();
78+
79+
let total_downloads = downloads
80+
.as_inner()
81+
.iter()
82+
.map(|(_, downloads)| downloads)
83+
.sum::<u64>();
84+
85+
info!("Log file: {path}");
86+
info!("Number of crates: {num_crates}");
87+
info!("Number of needed inserts: {total_inserts}");
88+
info!("Total number of downloads: {total_downloads}");
89+
info!("Time to parse: {parse_duration:?}");
90+
91+
let mut downloads = downloads.into_inner().into_iter().collect::<Vec<_>>();
92+
downloads.sort_by_key(|((_, _, _), downloads)| Reverse(*downloads));
93+
94+
let top_downloads = downloads
95+
.into_iter()
96+
.take(30)
97+
.map(|((krate, version, date), downloads)| {
98+
format!("{date} {krate}@{version}: {downloads}")
99+
})
100+
.collect::<Vec<_>>();
101+
102+
info!("Top 30 downloads: {top_downloads:?}");
103+
104+
Ok(())
105+
}
106+
}

src/worker/jobs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use diesel::sql_types::{Int2, Jsonb, Text};
66
use std::fmt::Display;
77

88
mod daily_db_maintenance;
9+
mod downloads;
910
pub mod dump_db;
1011
mod git;
1112
mod readmes;
@@ -14,6 +15,7 @@ mod typosquat;
1415
mod update_downloads;
1516

1617
pub use self::daily_db_maintenance::DailyDbMaintenance;
18+
pub use self::downloads::ProcessCdnLog;
1719
pub use self::dump_db::DumpDb;
1820
pub use self::git::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex};
1921
pub use self::readmes::RenderAndUploadReadme;

src/worker/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ impl RunnerExt for Runner<Arc<Environment>> {
2323
.register_job_type::<jobs::DailyDbMaintenance>()
2424
.register_job_type::<jobs::DumpDb>()
2525
.register_job_type::<jobs::NormalizeIndex>()
26+
.register_job_type::<jobs::ProcessCdnLog>()
2627
.register_job_type::<jobs::RenderAndUploadReadme>()
2728
.register_job_type::<jobs::SquashIndex>()
2829
.register_job_type::<jobs::SyncAdmins>()

0 commit comments

Comments
 (0)