|
1 | 1 | mod message;
|
2 | 2 |
|
3 | 3 | use crate::config::{CdnLogQueueConfig, CdnLogStorageConfig};
|
| 4 | +use crate::db::DieselPool; |
4 | 5 | use crate::sqs::{MockSqsQueue, SqsQueue, SqsQueueImpl};
|
5 | 6 | use crate::tasks::spawn_blocking;
|
6 | 7 | use crate::worker::Environment;
|
@@ -149,9 +150,39 @@ impl BackgroundJob for ProcessCdnLogQueue {
|
149 | 150 | type Context = Arc<Environment>;
|
150 | 151 |
|
151 | 152 | async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
|
152 |
| - const MAX_BATCH_SIZE: usize = 10; |
153 |
| - |
154 | 153 | let queue = Self::build_queue(&ctx.config.cdn_log_queue);
|
| 154 | + self.run(queue, &ctx.connection_pool).await |
| 155 | + } |
| 156 | +} |
| 157 | + |
| 158 | +impl ProcessCdnLogQueue { |
| 159 | + fn build_queue(config: &CdnLogQueueConfig) -> Box<dyn SqsQueue + Send + Sync> { |
| 160 | + match config { |
| 161 | + CdnLogQueueConfig::Mock => Box::new(MockSqsQueue::new()), |
| 162 | + CdnLogQueueConfig::SQS { |
| 163 | + access_key, |
| 164 | + secret_key, |
| 165 | + region, |
| 166 | + queue_url, |
| 167 | + } => { |
| 168 | + use secrecy::ExposeSecret; |
| 169 | + |
| 170 | + let secret_key = secret_key.expose_secret(); |
| 171 | + let credentials = Credentials::from_keys(access_key, secret_key, None); |
| 172 | + |
| 173 | + let region = Region::new(region.to_owned()); |
| 174 | + |
| 175 | + Box::new(SqsQueueImpl::new(queue_url, region, credentials)) |
| 176 | + } |
| 177 | + } |
| 178 | + } |
| 179 | + |
| 180 | + async fn run( |
| 181 | + &self, |
| 182 | + queue: Box<dyn SqsQueue + Send + Sync>, |
| 183 | + connection_pool: &DieselPool, |
| 184 | + ) -> anyhow::Result<()> { |
| 185 | + const MAX_BATCH_SIZE: usize = 10; |
155 | 186 |
|
156 | 187 | info!("Receiving messages from the CDN log queue…");
|
157 | 188 | let mut num_remaining = self.max_messages;
|
@@ -207,7 +238,7 @@ impl BackgroundJob for ProcessCdnLogQueue {
|
207 | 238 | continue;
|
208 | 239 | }
|
209 | 240 |
|
210 |
| - let pool = ctx.connection_pool.clone(); |
| 241 | + let pool = connection_pool.clone(); |
211 | 242 | spawn_blocking({
|
212 | 243 | let message_id = message_id.to_owned();
|
213 | 244 | move || {
|
@@ -251,29 +282,6 @@ impl BackgroundJob for ProcessCdnLogQueue {
|
251 | 282 | }
|
252 | 283 | }
|
253 | 284 |
|
254 |
| -impl ProcessCdnLogQueue { |
255 |
| - fn build_queue(config: &CdnLogQueueConfig) -> Box<dyn SqsQueue + Send + Sync> { |
256 |
| - match config { |
257 |
| - CdnLogQueueConfig::Mock => Box::new(MockSqsQueue::new()), |
258 |
| - CdnLogQueueConfig::SQS { |
259 |
| - access_key, |
260 |
| - secret_key, |
261 |
| - region, |
262 |
| - queue_url, |
263 |
| - } => { |
264 |
| - use secrecy::ExposeSecret; |
265 |
| - |
266 |
| - let secret_key = secret_key.expose_secret(); |
267 |
| - let credentials = Credentials::from_keys(access_key, secret_key, None); |
268 |
| - |
269 |
| - let region = Region::new(region.to_owned()); |
270 |
| - |
271 |
| - Box::new(SqsQueueImpl::new(queue_url, region, credentials)) |
272 |
| - } |
273 |
| - } |
274 |
| - } |
275 |
| -} |
276 |
| - |
277 | 285 | #[cfg(test)]
|
278 | 286 | mod tests {
|
279 | 287 | use super::*;
|
|
0 commit comments