diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 203bad0..abc9430 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -5,7 +5,8 @@ use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::from_slice; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{sync::mpsc, task::JoinHandle, time}; +use tracing::{Instrument, debug, trace}; /// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -46,22 +47,48 @@ impl TxPoller { Ok(response.transactions) } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. - pub fn spawn(mut self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = mpsc::unbounded_channel(); - let jh = tokio::spawn(async move { - loop { - if let Ok(transactions) = self.check_tx_cache().await { - tracing::debug!(count = ?transactions.len(), "found transactions"); + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + loop { + let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_tx_cache().instrument(span.clone()).await { + Ok(transactions) => { + let _guard = span.entered(); + debug!(count = ?transactions.len(), "found transactions"); for tx in transactions.into_iter() { - if let Err(err) = outbound.send(tx) { - tracing::error!(err = ?err, "failed to send transaction - channel is dropped."); + if outbound.send(tx).is_err() { + // If there are no receivers, we can shut down + trace!("No receivers left, shutting down"); + break; } } } - tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching transactions"); + } } - }); + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); + let jh = tokio::spawn(self.task_future(outbound)); (inbound, jh) } }