diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 6ba1e8e..e750696 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -180,7 +180,6 @@ impl BlockBuilder { error!(error = %e, "error polling bundles"); } } - self.bundle_poller.evict(); } /// Simulates a Zenith bundle against the rollup state diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 670ecc5..acc3cae 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,58 +1,63 @@ -//! Bundler service responsible for managing bundles. -use super::oauth::Authenticator; - +//! Bundler service responsible for fetching bundles and sending them to the simulator. pub use crate::config::BuilderConfig; - +use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::time::{Duration, Instant}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; +use tokio::task::JoinHandle; +use tokio::time; +use tracing::{Instrument, debug, trace}; use zenith_types::ZenithEthBundle; -/// A bundle response from the tx-pool endpoint, containing a UUID and a -/// [`ZenithEthBundle`]. +/// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { - /// The bundle id (a UUID) + /// Cache identifier for the bundle pub id: String, - /// The bundle itself + /// The Zenith bundle for this bundle pub bundle: ZenithEthBundle, } /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolBundleResponse { - /// the list of bundles + /// Bundle responses are available on the bundles property. pub bundles: Vec, } -/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles. -#[derive(Debug)] +/// The BundlePoller polls the tx-pool for bundles. +#[derive(Debug, Clone)] pub struct BundlePoller { - /// Configuration + /// The builder configuration values. pub config: BuilderConfig, - /// [`Authenticator`] for fetching OAuth tokens + /// Authentication module that periodically fetches and stores auth tokens. pub authenticator: Authenticator, - /// Already seen bundle UUIDs - pub seen_uuids: HashMap, + /// Defines the interval at which the bundler polls the tx-pool for bundles. + pub poll_interval_ms: u64, } -/// Implements a poller for the block builder to pull bundles from the tx cache. +/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() } + Self { config: config.clone(), authenticator, poll_interval_ms: 1000 } } - /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache. - pub async fn check_bundle_cache(&mut self) -> eyre::Result> { - let mut unique: Vec = Vec::new(); + /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. + pub fn new_with_poll_interval_ms( + config: &BuilderConfig, + authenticator: Authenticator, + poll_interval_ms: u64, + ) -> Self { + Self { config: config.clone(), authenticator, poll_interval_ms } + } + /// Fetches bundles from the transaction cache and returns them. + pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let token = self.authenticator.fetch_oauth_token().await?; - // Add the token to the request headers let result = reqwest::Client::new() .get(bundle_url) .bearer_auth(token.access_token().secret()) @@ -61,38 +66,52 @@ impl BundlePoller { .error_for_status()?; let body = result.bytes().await?; - let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?; - - bundles.bundles.iter().for_each(|bundle| { - self.check_seen_bundles(bundle.clone(), &mut unique); - }); + let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?; - Ok(unique) + Ok(resp.bundles) } - /// Checks if the bundle has been seen before and if not, adds it to the unique bundles list. - fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec) { - self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| { - // add to the set of unique bundles - unique.push(bundle.clone()); - Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration) - }); - } + async fn task_future(mut self, outbound: UnboundedSender) { + loop { + let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url); - /// Evicts expired bundles from the cache. - pub fn evict(&mut self) { - let expired_keys: Vec = self - .seen_uuids - .iter() - .filter_map( - |(key, expiry)| { - if expiry.elapsed().is_zero() { Some(key.clone()) } else { None } - }, - ) - .collect(); - - for key in expired_keys { - self.seen_uuids.remove(&key); + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_bundle_cache().instrument(span.clone()).await { + Ok(bundles) => { + tracing::debug!(count = ?bundles.len(), "found bundles"); + for bundle in bundles.into_iter() { + if let Err(err) = outbound.send(bundle) { + tracing::error!(err = ?err, "Failed to send bundle - channel is dropped"); + } + } + } + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching bundles"); + } + } + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; } } + + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + + let jh = tokio::spawn(self.task_future(outbound)); + + (inbound, jh) + } }