Skip to content

Commit 7527730

Browse files
committed
cdn_logs: Implement "count_downloads" example binary
1 parent 6ea9fac commit 7527730

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

crates_io_cdn_logs/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ tracing = "=0.1.40"
2020

2121
[dev-dependencies]
2222
claims = "=0.7.1"
23+
clap = { version = "=4.4.18", features = ["derive"] }
2324
criterion = { version = "=0.5.1", features = ["async_tokio"] }
2425
insta = "=1.34.0"
25-
tokio = { version = "=1.35.1", features = ["macros", "rt"] }
26+
tokio = { version = "=1.35.1", features = ["fs", "macros", "rt", "rt-multi-thread"] }
2627
tracing-subscriber = { version = "=0.3.18", features = ["env-filter"] }
2728

2829
[[bench]]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use anyhow::Context;
2+
use clap::Parser;
3+
use crates_io_cdn_logs::{count_downloads, Decompressor};
4+
use std::collections::HashSet;
5+
use std::path::PathBuf;
6+
use std::time::SystemTime;
7+
use tokio::fs::File;
8+
use tokio::io::BufReader;
9+
use tracing::level_filters::LevelFilter;
10+
use tracing_subscriber::{fmt, EnvFilter};
11+
12+
#[derive(Debug, clap::Parser)]
13+
struct Options {
14+
/// The path to the CDN log file to parse
15+
path: PathBuf,
16+
}
17+
18+
#[tokio::main]
19+
async fn main() -> anyhow::Result<()> {
20+
init_tracing();
21+
22+
let options = Options::parse();
23+
24+
let file = File::open(&options.path)
25+
.await
26+
.with_context(|| format!("Failed to open {}", options.path.display()))?;
27+
28+
let reader = BufReader::new(file);
29+
30+
let extension = options
31+
.path
32+
.extension()
33+
.and_then(|ext| ext.to_str())
34+
.unwrap_or_default();
35+
36+
let start = SystemTime::now();
37+
let downloads = match extension {
38+
"gz" | "zst" => {
39+
let decompressor = Decompressor::from_extension(reader, Some(extension))?;
40+
let reader = BufReader::new(decompressor);
41+
count_downloads(reader).await?
42+
}
43+
_ => count_downloads(reader).await?,
44+
};
45+
let duration = start.elapsed()?;
46+
println!("{downloads:?}");
47+
println!();
48+
49+
let num_crates = downloads
50+
.as_inner()
51+
.iter()
52+
.map(|((_, krate, _), _)| krate)
53+
.collect::<HashSet<_>>()
54+
.len();
55+
56+
let total_inserts = downloads.as_inner().len();
57+
58+
let total_downloads = downloads
59+
.as_inner()
60+
.iter()
61+
.map(|(_, downloads)| downloads)
62+
.sum::<u64>();
63+
64+
println!("Number of crates: {num_crates}");
65+
println!("Number of needed inserts: {total_inserts}");
66+
println!("Total number of downloads: {total_downloads}");
67+
println!("Time to parse: {duration:?}");
68+
69+
Ok(())
70+
}
71+
72+
fn init_tracing() {
73+
let env_filter = EnvFilter::builder()
74+
.with_default_directive(LevelFilter::INFO.into())
75+
.from_env_lossy();
76+
77+
fmt().compact().with_env_filter(env_filter).init();
78+
}

0 commit comments

Comments
 (0)