diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 1ea5eee..2db1ddd 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -4,8 +4,10 @@ use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; use tokio::task::JoinHandle; +use tokio::time; +use tracing::{Instrument, debug, trace}; use zenith_types::ZenithEthBundle; /// Holds a bundle from the cache with a unique ID and a Zenith bundle. @@ -69,12 +71,24 @@ impl BundlePoller { Ok(resp.bundles) } - /// Spawns a task that sends bundles it finds to its channel sender. - pub fn spawn(mut self) -> (UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = unbounded_channel(); - let jh = tokio::spawn(async move { - loop { - if let Ok(bundles) = self.check_bundle_cache().await { + async fn task_future(mut self, outbound: UnboundedSender) { + loop { + let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_bundle_cache().instrument(span.clone()).await { + Ok(bundles) => { tracing::debug!(count = ?bundles.len(), "found bundles"); for bundle in bundles.into_iter() { if let Err(err) = outbound.send(bundle) { @@ -82,9 +96,21 @@ impl BundlePoller { } } } - tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching bundles"); + } } - }); + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + + let jh = tokio::spawn(self.task_future(outbound)); (inbound, jh) }