Skip to content

Commit 9a12705

Browse files
committed
worker/jobs/downloads: Extract inner run() fn
1 parent 08836d0 commit 9a12705

File tree

1 file changed

+34
-26
lines changed

1 file changed

+34
-26
lines changed

src/worker/jobs/downloads.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod message;
22

33
use crate::config::{CdnLogQueueConfig, CdnLogStorageConfig};
4+
use crate::db::DieselPool;
45
use crate::sqs::{MockSqsQueue, SqsQueue, SqsQueueImpl};
56
use crate::tasks::spawn_blocking;
67
use crate::worker::Environment;
@@ -149,9 +150,39 @@ impl BackgroundJob for ProcessCdnLogQueue {
149150
type Context = Arc<Environment>;
150151

151152
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
152-
const MAX_BATCH_SIZE: usize = 10;
153-
154153
let queue = Self::build_queue(&ctx.config.cdn_log_queue);
154+
self.run(queue, &ctx.connection_pool).await
155+
}
156+
}
157+
158+
impl ProcessCdnLogQueue {
159+
fn build_queue(config: &CdnLogQueueConfig) -> Box<dyn SqsQueue + Send + Sync> {
160+
match config {
161+
CdnLogQueueConfig::Mock => Box::new(MockSqsQueue::new()),
162+
CdnLogQueueConfig::SQS {
163+
access_key,
164+
secret_key,
165+
region,
166+
queue_url,
167+
} => {
168+
use secrecy::ExposeSecret;
169+
170+
let secret_key = secret_key.expose_secret();
171+
let credentials = Credentials::from_keys(access_key, secret_key, None);
172+
173+
let region = Region::new(region.to_owned());
174+
175+
Box::new(SqsQueueImpl::new(queue_url, region, credentials))
176+
}
177+
}
178+
}
179+
180+
async fn run(
181+
&self,
182+
queue: Box<dyn SqsQueue + Send + Sync>,
183+
connection_pool: &DieselPool,
184+
) -> anyhow::Result<()> {
185+
const MAX_BATCH_SIZE: usize = 10;
155186

156187
info!("Receiving messages from the CDN log queue…");
157188
let mut num_remaining = self.max_messages;
@@ -207,7 +238,7 @@ impl BackgroundJob for ProcessCdnLogQueue {
207238
continue;
208239
}
209240

210-
let pool = ctx.connection_pool.clone();
241+
let pool = connection_pool.clone();
211242
spawn_blocking({
212243
let message_id = message_id.to_owned();
213244
move || {
@@ -251,29 +282,6 @@ impl BackgroundJob for ProcessCdnLogQueue {
251282
}
252283
}
253284

254-
impl ProcessCdnLogQueue {
255-
fn build_queue(config: &CdnLogQueueConfig) -> Box<dyn SqsQueue + Send + Sync> {
256-
match config {
257-
CdnLogQueueConfig::Mock => Box::new(MockSqsQueue::new()),
258-
CdnLogQueueConfig::SQS {
259-
access_key,
260-
secret_key,
261-
region,
262-
queue_url,
263-
} => {
264-
use secrecy::ExposeSecret;
265-
266-
let secret_key = secret_key.expose_secret();
267-
let credentials = Credentials::from_keys(access_key, secret_key, None);
268-
269-
let region = Region::new(region.to_owned());
270-
271-
Box::new(SqsQueueImpl::new(queue_url, region, credentials))
272-
}
273-
}
274-
}
275-
}
276-
277285
#[cfg(test)]
278286
mod tests {
279287
use super::*;

0 commit comments

Comments
 (0)