Skip to content

Commit 8d37858

Browse files
committed
Extract SqsQueue trait
1 parent 0fb7050 commit 8d37858

File tree

3 files changed

+101
-19
lines changed

3 files changed

+101
-19
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ mod router;
5353
pub mod schema;
5454
pub mod sentry;
5555
pub mod sql;
56+
pub mod sqs;
5657
pub mod ssh;
5758
pub mod storage;
5859
pub mod tasks;

src/sqs.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use anyhow::Context;
2+
use async_trait::async_trait;
3+
use aws_credential_types::Credentials;
4+
use aws_sdk_sqs::config::{BehaviorVersion, Region};
5+
use aws_sdk_sqs::operation::receive_message::ReceiveMessageOutput;
6+
use mockall::automock;
7+
8+
/// The [SqsQueue] trait defines a basic interface for interacting with an
9+
/// AWS SQS queue.
10+
///
11+
/// A [MockSqsQueue] struct is automatically generated by the [automock]
12+
/// attribute. This struct can be used in unit tests to mock the behavior of
13+
/// the [SqsQueue] trait.
14+
///
15+
/// The [SqsQueueImpl] struct is the actual implementation of the trait.
16+
#[automock]
17+
#[async_trait]
18+
pub trait SqsQueue {
19+
async fn receive_messages(&self, max_messages: i32) -> anyhow::Result<ReceiveMessageOutput>;
20+
async fn delete_message(&self, receipt_handle: &str) -> anyhow::Result<()>;
21+
}
22+
23+
/// The [SqsQueueImpl] struct is the actual implementation of the [SqsQueue]
24+
/// trait, which interacts with the real AWS API servers.
25+
#[derive(Debug, Clone)]
26+
pub struct SqsQueueImpl {
27+
client: aws_sdk_sqs::Client,
28+
queue_url: String,
29+
}
30+
31+
impl SqsQueueImpl {
32+
pub fn new(queue_url: impl Into<String>, region: Region, credentials: Credentials) -> Self {
33+
let config = aws_sdk_sqs::Config::builder()
34+
.credentials_provider(credentials)
35+
.region(region)
36+
.behavior_version(BehaviorVersion::v2023_11_09())
37+
.build();
38+
39+
let client = aws_sdk_sqs::Client::from_conf(config);
40+
let queue_url = queue_url.into();
41+
42+
SqsQueueImpl { client, queue_url }
43+
}
44+
}
45+
46+
#[async_trait]
47+
impl SqsQueue for SqsQueueImpl {
48+
async fn receive_messages(&self, max_messages: i32) -> anyhow::Result<ReceiveMessageOutput> {
49+
let response = self
50+
.client
51+
.receive_message()
52+
.max_number_of_messages(max_messages)
53+
.queue_url(&self.queue_url)
54+
.send()
55+
.await
56+
.context("Failed to receive SQS queue message")?;
57+
58+
Ok(response)
59+
}
60+
61+
async fn delete_message(&self, receipt_handle: &str) -> anyhow::Result<()> {
62+
self.client
63+
.delete_message()
64+
.receipt_handle(receipt_handle)
65+
.queue_url(&self.queue_url)
66+
.send()
67+
.await
68+
.context("Failed to delete SQS queue message")?;
69+
70+
Ok(())
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::*;
77+
78+
#[test]
79+
fn test_constructor() {
80+
let credentials = Credentials::new(
81+
"ANOTREAL",
82+
"notrealrnrELgWzOk3IfjzDKtFBhDby",
83+
None,
84+
None,
85+
"test",
86+
);
87+
88+
let queue_url = "https://sqs.us-west-1.amazonaws.com/359172468976/cdn-log-event-queue";
89+
let region = Region::new("us-west-1");
90+
91+
// Check that `SqsQueueImpl::new()` does not panic.
92+
let _queue = SqsQueueImpl::new(queue_url, region, credentials);
93+
}
94+
}

src/worker/jobs/downloads.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
mod message;
22

33
use crate::config::CdnLogStorageConfig;
4+
use crate::sqs::{SqsQueue, SqsQueueImpl};
45
use crate::tasks::spawn_blocking;
56
use crate::worker::Environment;
67
use anyhow::Context;
78
use aws_credential_types::Credentials;
8-
use aws_sdk_sqs::config::{BehaviorVersion, Region};
9+
use aws_sdk_sqs::config::Region;
910
use crates_io_cdn_logs::{count_downloads, Decompressor};
1011
use crates_io_env_vars::required_var;
1112
use crates_io_worker::BackgroundJob;
@@ -158,13 +159,7 @@ impl BackgroundJob for ProcessCdnLogQueue {
158159

159160
let credentials = Credentials::from_keys(access_key, secret_key, None);
160161

161-
let config = aws_sdk_sqs::Config::builder()
162-
.credentials_provider(credentials)
163-
.region(Region::new(region))
164-
.behavior_version(BehaviorVersion::v2023_11_09())
165-
.build();
166-
167-
let client = aws_sdk_sqs::Client::from_conf(config);
162+
let queue = SqsQueueImpl::new(queue_url, Region::new(region), credentials);
168163

169164
info!("Receiving messages from the CDN log queue…");
170165
let mut num_remaining = self.max_messages;
@@ -173,12 +168,7 @@ impl BackgroundJob for ProcessCdnLogQueue {
173168
num_remaining -= batch_size;
174169

175170
debug!("Receiving next {batch_size} messages from the CDN log queue…");
176-
let response = client
177-
.receive_message()
178-
.queue_url(&queue_url)
179-
.max_number_of_messages(batch_size as i32)
180-
.send()
181-
.await?;
171+
let response = queue.receive_messages(batch_size as i32).await?;
182172

183173
let messages = response.messages();
184174
debug!(
@@ -200,11 +190,8 @@ impl BackgroundJob for ProcessCdnLogQueue {
200190
};
201191

202192
debug!("Deleting message {message_id} from the CDN log queue…");
203-
client
204-
.delete_message()
205-
.queue_url(&queue_url)
206-
.receipt_handle(receipt_handle)
207-
.send()
193+
queue
194+
.delete_message(receipt_handle)
208195
.await
209196
.with_context(|| {
210197
format!("Failed to delete message {message_id} from the CDN log queue")

0 commit comments

Comments
 (0)