Skip to content

Commit 296a9cb

Browse files
committed
refactor: clean up the other poller too
1 parent c843eb9 commit 296a9cb

File tree

2 files changed

+31
-22
lines changed

2 files changed

+31
-22
lines changed

src/tasks/bundler.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
77
use signet_bundle::SignetEthBundle;
88
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
99
use tokio::task::JoinHandle;
10-
use tokio::time;
10+
use tokio::time::{self, Duration};
1111

1212
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
1313
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -62,18 +62,21 @@ impl BundlePoller {
6262
return Ok(vec![]);
6363
};
6464

65-
let result = self
66-
.client
65+
self.client
6766
.get(bundle_url)
6867
.bearer_auth(token.access_token().secret())
6968
.send()
7069
.await?
71-
.error_for_status()?;
72-
73-
let body = result.bytes().await?;
74-
let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?;
70+
.error_for_status()?
71+
.json()
72+
.await
73+
.map(|resp: TxPoolBundleResponse| resp.bundles)
74+
.map_err(Into::into)
75+
}
7576

76-
Ok(resp.bundles)
77+
/// Returns the poll duration as a [`Duration`].
78+
const fn poll_duration(&self) -> Duration {
79+
Duration::from_millis(self.poll_interval_ms)
7780
}
7881

7982
async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
@@ -92,22 +95,21 @@ impl BundlePoller {
9295
// exit the span after the check.
9396
drop(_guard);
9497

95-
match self.check_bundle_cache().instrument(span.clone()).await {
96-
Ok(bundles) => {
97-
debug!(count = ?bundles.len(), "found bundles");
98-
for bundle in bundles.into_iter() {
99-
if let Err(err) = outbound.send(bundle) {
100-
error!(err = ?err, "Failed to send bundle - channel is dropped");
101-
}
98+
if let Ok(bundles) = self
99+
.check_bundle_cache()
100+
.instrument(span.clone())
101+
.await
102+
.inspect_err(|err| debug!(%err, "Error fetching bundles"))
103+
{
104+
debug!(count = ?bundles.len(), "found bundles");
105+
for bundle in bundles.into_iter() {
106+
if let Err(err) = outbound.send(bundle) {
107+
error!(err = ?err, "Failed to send bundle - channel is dropped");
102108
}
103109
}
104-
// If fetching was an error, we log and continue. We expect
105-
// these to be transient network issues.
106-
Err(e) => {
107-
debug!(error = %e, "Error fetching bundles");
108-
}
109110
}
110-
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
111+
112+
time::sleep(self.poll_duration()).await;
111113
}
112114
}
113115

src/tasks/tx_poller.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
//! Transaction service responsible for fetching and sending trasnsactions to the simulator.
2+
use std::time::Duration;
3+
24
use crate::config::BuilderConfig;
35
use alloy::consensus::TxEnvelope;
46
use eyre::Error;
@@ -39,6 +41,11 @@ impl TxPoller {
3941
Self { config: config.clone(), client: Client::new(), poll_interval_ms }
4042
}
4143

44+
/// Returns the poll duration as a [`Duration`].
45+
const fn poll_duration(&self) -> Duration {
46+
Duration::from_millis(self.poll_interval_ms)
47+
}
48+
4249
/// Polls the transaction cache for transactions.
4350
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
4451
let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?;
@@ -84,7 +91,7 @@ impl TxPoller {
8491
}
8592
}
8693

87-
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
94+
time::sleep(self.poll_duration()).await;
8895
}
8996
}
9097

0 commit comments

Comments
 (0)