Skip to content

Commit b5a5cf4

Browse files
authored
Fix: various updates to the tx poller (#67)
* fix: break loop on closure and improve tracing * refactor: break out the task future
1 parent 1ad6e14 commit b5a5cf4

File tree

1 file changed

+39
-12
lines changed

1 file changed

+39
-12
lines changed

src/tasks/tx_poller.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use eyre::Error;
55
use reqwest::{Client, Url};
66
use serde::{Deserialize, Serialize};
77
use serde_json::from_slice;
8-
use tokio::{sync::mpsc, task::JoinHandle};
8+
use tokio::{sync::mpsc, task::JoinHandle, time};
9+
use tracing::{Instrument, debug, trace};
910

1011
/// Models a response from the transaction pool.
1112
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -46,22 +47,48 @@ impl TxPoller {
4647
Ok(response.transactions)
4748
}
4849

49-
/// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
50-
pub fn spawn(mut self) -> (mpsc::UnboundedReceiver<TxEnvelope>, JoinHandle<()>) {
51-
let (outbound, inbound) = mpsc::unbounded_channel();
52-
let jh = tokio::spawn(async move {
53-
loop {
54-
if let Ok(transactions) = self.check_tx_cache().await {
55-
tracing::debug!(count = ?transactions.len(), "found transactions");
50+
async fn task_future(mut self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
51+
loop {
52+
let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url);
53+
54+
// Enter the span for the next check.
55+
let _guard = span.enter();
56+
57+
// Check this here to avoid making the web request if we know
58+
// we don't need the results.
59+
if outbound.is_closed() {
60+
trace!("No receivers left, shutting down");
61+
break;
62+
}
63+
// exit the span after the check.
64+
drop(_guard);
65+
66+
match self.check_tx_cache().instrument(span.clone()).await {
67+
Ok(transactions) => {
68+
let _guard = span.entered();
69+
debug!(count = ?transactions.len(), "found transactions");
5670
for tx in transactions.into_iter() {
57-
if let Err(err) = outbound.send(tx) {
58-
tracing::error!(err = ?err, "failed to send transaction - channel is dropped.");
71+
if outbound.send(tx).is_err() {
72+
// If there are no receivers, we can shut down
73+
trace!("No receivers left, shutting down");
74+
break;
5975
}
6076
}
6177
}
62-
tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await;
78+
// If fetching was an error, we log and continue. We expect
79+
// these to be transient network issues.
80+
Err(e) => {
81+
debug!(error = %e, "Error fetching transactions");
82+
}
6383
}
64-
});
84+
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
85+
}
86+
}
87+
88+
/// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
89+
pub fn spawn(self) -> (mpsc::UnboundedReceiver<TxEnvelope>, JoinHandle<()>) {
90+
let (outbound, inbound) = mpsc::unbounded_channel();
91+
let jh = tokio::spawn(self.task_future(outbound));
6592
(inbound, jh)
6693
}
6794
}

0 commit comments

Comments
 (0)