Skip to content

Commit 4641f9d

Browse files
committed
Add a binary for running background jobs and monitoring their status
These binaries are pretty simple (and a bit spaghetti) for the time being, since we have so little actually using the background job framework. The runner is meant to be run on a worker dyno continuously, while the monitor should be run by a cron-like tool roughly every 5 minutes. We should move `update-downloads` to be scheduled as well, since it spends so little time actually working.
1 parent ef0f56d commit 4641f9d

File tree

4 files changed

+122
-0
lines changed

4 files changed

+122
-0
lines changed

Procfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
web: bin/diesel migration run && bin/start-nginx ./target/release/server
22
worker: ./target/release/update-downloads daemon 300
3+
background_worker: ./target/release/background-worker

src/bin/background-worker.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Runs enqueued background jobs
2+
//
3+
// This binary will loop until interrupted. Every second, it will attempt to
4+
// run any jobs in the background queue. Panics if attempting to count
5+
// available jobs fails.
6+
//
7+
// Usage:
8+
// cargo run --bin background-worker
9+
10+
#![deny(warnings)]
11+
12+
use cargo_registry::{background, background_jobs::*, db};
13+
use diesel::r2d2;
14+
use std::env;
15+
use std::thread::sleep;
16+
use std::time::Duration;
17+
18+
fn main() {
19+
let config = cargo_registry::Config::default();
20+
21+
let username = env::var("GIT_HTTP_USER");
22+
let password = env::var("GIT_HTTP_PWD");
23+
let credentials = match (username, password) {
24+
(Ok(u), Ok(p)) => Some((u, p)),
25+
_ => None,
26+
};
27+
let environment = Environment {
28+
index_location: config.index_location,
29+
credentials,
30+
};
31+
32+
// We're only using 1 thread, so we only need 1 connection
33+
let db_config = r2d2::Pool::builder().max_size(1);
34+
let db_pool = db::diesel_pool(&config.db_url, db_config);
35+
36+
let builder = background::Runner::builder(db_pool, environment).thread_count(1);
37+
let runner = job_runner(builder);
38+
39+
println!("Runner booted, running jobs");
40+
41+
loop {
42+
runner
43+
.run_all_pending_jobs()
44+
.expect("Could not begin running jobs");
45+
sleep(Duration::from_secs(1));
46+
}
47+
}

src/bin/monitor.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
//! Checks for any invariants we expect to be true, and pages whoever is on call
2+
//! if they are not.
3+
//!
4+
//! Usage:
5+
//! cargo run --bin monitor
6+
7+
#![deny(warnings)]
8+
9+
#[macro_use]
10+
extern crate serde_derive;
11+
12+
mod on_call;
13+
14+
use cargo_registry::{db, util::CargoResult};
15+
use diesel::prelude::*;
16+
use std::env;
17+
18+
fn main() -> CargoResult<()> {
19+
let conn = db::connect_now()?;
20+
21+
check_stalled_background_jobs(&conn)?;
22+
Ok(())
23+
}
24+
25+
fn check_stalled_background_jobs(conn: &PgConnection) -> CargoResult<()> {
26+
use cargo_registry::schema::background_jobs::dsl::*;
27+
use diesel::dsl::*;
28+
29+
const BACKGROUND_JOB_KEY: &str = "background_jobs";
30+
31+
println!("Checking for stalled background jobs");
32+
33+
let max_job_time = env::var("MAX_JOB_TIME")
34+
.map(|s| s.parse::<i32>().unwrap())
35+
.unwrap_or(15);
36+
37+
let stalled_job_count = background_jobs
38+
.filter(created_at.lt(now - max_job_time.minutes()))
39+
.count()
40+
.get_result::<i64>(conn)?;
41+
42+
let event = if stalled_job_count > 0 {
43+
on_call::Event::Trigger {
44+
incident_key: Some(BACKGROUND_JOB_KEY.into()),
45+
description: format!(
46+
"{} jobs have been in the queue for more than {} minutes",
47+
stalled_job_count, max_job_time
48+
),
49+
}
50+
} else {
51+
on_call::Event::Resolve {
52+
incident_key: BACKGROUND_JOB_KEY.into(),
53+
description: Some("No stalled background jobs".into()),
54+
}
55+
};
56+
57+
log_and_trigger_event(event)?;
58+
Ok(())
59+
}
60+
61+
fn log_and_trigger_event(event: on_call::Event) -> CargoResult<()> {
62+
match event {
63+
on_call::Event::Trigger {
64+
ref description, ..
65+
} => println!("Paging on-call: {}", description),
66+
on_call::Event::Resolve {
67+
description: Some(ref description),
68+
..
69+
} => println!("{}", description),
70+
_ => {} // noop
71+
}
72+
event.send()
73+
}

src/bin/on_call/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub enum Event {
1010
incident_key: Option<String>,
1111
description: String,
1212
},
13+
#[allow(dead_code)] // Not all binaries create Acknowledge events
1314
Acknowledge {
1415
incident_key: String,
1516
description: Option<String>,

0 commit comments

Comments
 (0)