Skip to content

Commit c843eb9

Browse files
committed
refactor: cleaning up poller logic
1 parent 30ea085 commit c843eb9

File tree

1 file changed

+21
-19
lines changed

1 file changed

+21
-19
lines changed

src/tasks/tx_poller.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use eyre::Error;
55
use init4_bin_base::deps::tracing::{Instrument, debug, debug_span, trace};
66
use reqwest::{Client, Url};
77
use serde::{Deserialize, Serialize};
8-
use serde_json::from_slice;
98
use tokio::{sync::mpsc, task::JoinHandle, time};
109

1110
/// Models a response from the transaction pool.
@@ -43,9 +42,14 @@ impl TxPoller {
4342
/// Polls the transaction cache for transactions.
4443
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
4544
let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?;
46-
let result = self.client.get(url).send().await?;
47-
let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?;
48-
Ok(response.transactions)
45+
self.client
46+
.get(url)
47+
.send()
48+
.await?
49+
.json()
50+
.await
51+
.map(|resp: TxPoolResponse| resp.transactions)
52+
.map_err(Into::into)
4953
}
5054

5155
async fn task_future(mut self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
@@ -64,24 +68,22 @@ impl TxPoller {
6468
// exit the span after the check.
6569
drop(_guard);
6670

67-
match self.check_tx_cache().instrument(span.clone()).await {
68-
Ok(transactions) => {
69-
let _guard = span.entered();
70-
debug!(count = ?transactions.len(), "found transactions");
71-
for tx in transactions.into_iter() {
72-
if outbound.send(tx).is_err() {
73-
// If there are no receivers, we can shut down
74-
trace!("No receivers left, shutting down");
75-
break;
76-
}
71+
if let Ok(transactions) =
72+
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|err| {
73+
debug!(%err, "Error fetching transactions");
74+
})
75+
{
76+
let _guard = span.entered();
77+
debug!(count = ?transactions.len(), "found transactions");
78+
for tx in transactions.into_iter() {
79+
if outbound.send(tx).is_err() {
80+
// If there are no receivers, we can shut down
81+
trace!("No receivers left, shutting down");
82+
break;
7783
}
7884
}
79-
// If fetching was an error, we log and continue. We expect
80-
// these to be transient network issues.
81-
Err(e) => {
82-
debug!(error = %e, "Error fetching transactions");
83-
}
8485
}
86+
8587
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
8688
}
8789
}

0 commit comments

Comments
 (0)