From 700b30f0aa3f38a64ef6b173c99e9faeb3f1039b Mon Sep 17 00:00:00 2001 From: James Date: Mon, 7 Apr 2025 09:40:13 +0200 Subject: [PATCH 1/2] fix: break loop on closure and improve tracing --- src/tasks/tx_poller.rs | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 203bad0..290cb5e 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -5,7 +5,8 @@ use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::from_slice; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{sync::mpsc, task::JoinHandle, time}; +use tracing::{Instrument, debug, trace}; /// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -51,15 +52,39 @@ impl TxPoller { let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(async move { loop { - if let Ok(transactions) = self.check_tx_cache().await { - tracing::debug!(count = ?transactions.len(), "found transactions"); - for tx in transactions.into_iter() { - if let Err(err) = outbound.send(tx) { - tracing::error!(err = ?err, "failed to send transaction - channel is dropped."); + let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_tx_cache().instrument(span.clone()).await { + Ok(transactions) => { + let _guard = span.entered(); + debug!(count = ?transactions.len(), "found transactions"); + for tx in transactions.into_iter() { + if outbound.send(tx).is_err() { + // If there are no receivers, we can shut down + trace!("No receivers left, shutting down"); + break; + } } } + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching transactions"); + } } - tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; } }); (inbound, jh) From 6630624bf958b328fc60ca2ffa1a16297865cbad Mon Sep 17 00:00:00 2001 From: James Date: Mon, 7 Apr 2025 09:45:32 +0200 Subject: [PATCH 2/2] refactor: break out the task future --- src/tasks/tx_poller.rs | 68 ++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 290cb5e..abc9430 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -47,46 +47,48 @@ impl TxPoller { Ok(response.transactions) } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. - pub fn spawn(mut self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = mpsc::unbounded_channel(); - let jh = tokio::spawn(async move { - loop { - let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url); + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + loop { + let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url); - // Enter the span for the next check. - let _guard = span.enter(); + // Enter the span for the next check. + let _guard = span.enter(); - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - trace!("No receivers left, shutting down"); - break; - } - // exit the span after the check. - drop(_guard); + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); - match self.check_tx_cache().instrument(span.clone()).await { - Ok(transactions) => { - let _guard = span.entered(); - debug!(count = ?transactions.len(), "found transactions"); - for tx in transactions.into_iter() { - if outbound.send(tx).is_err() { - // If there are no receivers, we can shut down - trace!("No receivers left, shutting down"); - break; - } + match self.check_tx_cache().instrument(span.clone()).await { + Ok(transactions) => { + let _guard = span.entered(); + debug!(count = ?transactions.len(), "found transactions"); + for tx in transactions.into_iter() { + if outbound.send(tx).is_err() { + // If there are no receivers, we can shut down + trace!("No receivers left, shutting down"); + break; } } - // If fetching was an error, we log and continue. We expect - // these to be transient network issues. - Err(e) => { - debug!(error = %e, "Error fetching transactions"); - } } - time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching transactions"); + } } - }); + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); + let jh = tokio::spawn(self.task_future(outbound)); (inbound, jh) } }