From 3117b85a11774cd02735b2242bcb6c19e110a421 Mon Sep 17 00:00:00 2001 From: dylan Date: Sun, 16 Mar 2025 19:08:11 -0600 Subject: [PATCH 1/5] updates bundler to streaming actor pattern --- src/tasks/block.rs | 1 - src/tasks/bundler.rs | 83 +++++++++++++++++++++----------------------- 2 files changed, 39 insertions(+), 45 deletions(-) diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 6ba1e8e..e750696 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -180,7 +180,6 @@ impl BlockBuilder { error!(error = %e, "error polling bundles"); } } - self.bundle_poller.evict(); } /// Simulates a Zenith bundle against the rollup state diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 670ecc5..b43e80b 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,4 +1,6 @@ //! Bundler service responsible for managing bundles. +use std::sync::Arc; + use super::oauth::Authenticator; pub use crate::config::BuilderConfig; @@ -6,53 +8,56 @@ pub use crate::config::BuilderConfig; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::time::{Duration, Instant}; +use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; +use tokio::task::JoinHandle; use zenith_types::ZenithEthBundle; -/// A bundle response from the tx-pool endpoint, containing a UUID and a -/// [`ZenithEthBundle`]. +/// Holds a Signet bundle from the cache that has a unique identifier +/// and a Zenith bundle #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { - /// The bundle id (a UUID) + /// Cache identifier for the bundle pub id: String, - /// The bundle itself + /// The Zenith bundle for this bundle pub bundle: ZenithEthBundle, } +impl PartialEq for Bundle { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for Bundle {} + /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolBundleResponse { - /// the list of bundles + /// Bundle responses are availabel on the bundles property pub bundles: Vec, } /// The BundlePoller polls the tx-pool for bundles and manages the seen bundles. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BundlePoller { - /// Configuration + /// The builder configuration values. pub config: BuilderConfig, - /// [`Authenticator`] for fetching OAuth tokens + /// Authentication module that periodically fetches and stores auth tokens. pub authenticator: Authenticator, - /// Already seen bundle UUIDs - pub seen_uuids: HashMap, } /// Implements a poller for the block builder to pull bundles from the tx cache. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() } + Self { config: config.clone(), authenticator } } /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache. pub async fn check_bundle_cache(&mut self) -> eyre::Result> { - let mut unique: Vec = Vec::new(); - let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let token = self.authenticator.fetch_oauth_token().await?; - // Add the token to the request headers let result = reqwest::Client::new() .get(bundle_url) .bearer_auth(token.access_token().secret()) @@ -61,38 +66,28 @@ impl BundlePoller { .error_for_status()?; let body = result.bytes().await?; - let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?; - - bundles.bundles.iter().for_each(|bundle| { - self.check_seen_bundles(bundle.clone(), &mut unique); - }); + let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?; - Ok(unique) + Ok(resp.bundles) } - /// Checks if the bundle has been seen before and if not, adds it to the unique bundles list. - fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec) { - self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| { - // add to the set of unique bundles - unique.push(bundle.clone()); - Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration) + /// Spawns a task that simply sends out any bundles it ever finds + pub fn spawn(mut self) -> (UnboundedReceiver>, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + let jh = tokio::spawn(async move { + loop { + if let Ok(bundles) = self.check_bundle_cache().await { + tracing::debug!(count = ?bundles.len(), "found bundles"); + for bundle in bundles.iter() { + if let Err(err) = outbound.send(Arc::new(bundle.clone())) { + tracing::error!(err = ?err, "Failed to send bundle"); + } + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } }); - } - - /// Evicts expired bundles from the cache. - pub fn evict(&mut self) { - let expired_keys: Vec = self - .seen_uuids - .iter() - .filter_map( - |(key, expiry)| { - if expiry.elapsed().is_zero() { Some(key.clone()) } else { None } - }, - ) - .collect(); - for key in expired_keys { - self.seen_uuids.remove(&key); - } + (inbound, jh) } } From 96dce0425f1ba1a96e7090ec7aec82f571966657 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 17 Mar 2025 12:20:44 -0600 Subject: [PATCH 2/5] refactors - improves poll interval handling - cleans up and updates comments - removes arc usage --- src/tasks/bundler.rs | 56 +++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index b43e80b..accd943 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,10 +1,6 @@ -//! Bundler service responsible for managing bundles. -use std::sync::Arc; - -use super::oauth::Authenticator; - +//! Bundler service responsible for fetching bundles and sending them to the simulator. pub use crate::config::BuilderConfig; - +use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -12,8 +8,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; use tokio::task::JoinHandle; use zenith_types::ZenithEthBundle; -/// Holds a Signet bundle from the cache that has a unique identifier -/// and a Zenith bundle +/// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { /// Cache identifier for the bundle @@ -22,38 +17,37 @@ pub struct Bundle { pub bundle: ZenithEthBundle, } -impl PartialEq for Bundle { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for Bundle {} - /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolBundleResponse { - /// Bundle responses are availabel on the bundles property + /// Bundle responses are available on the bundles property. pub bundles: Vec, } -/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles. +/// The BundlePoller polls the tx-pool for bundles. #[derive(Debug, Clone)] pub struct BundlePoller { /// The builder configuration values. pub config: BuilderConfig, /// Authentication module that periodically fetches and stores auth tokens. pub authenticator: Authenticator, + /// Defines the interval at which the bundler polls the tx-pool for bundles. + pub poll_interval_ms: u64, } -/// Implements a poller for the block builder to pull bundles from the tx cache. +/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { config: config.clone(), authenticator } + Self { config: config.clone(), authenticator, poll_interval_ms: 1000 } } - /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache. + /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. + pub fn new_with_poll_interval_ms(config: &BuilderConfig, authenticator: Authenticator, poll_interval_ms: u64) -> Self { + Self { config: config.clone(), authenticator, poll_interval_ms } + } + + /// Fetches bundles from the transaction cache and returns them. pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let token = self.authenticator.fetch_oauth_token().await?; @@ -71,23 +65,31 @@ impl BundlePoller { Ok(resp.bundles) } - /// Spawns a task that simply sends out any bundles it ever finds - pub fn spawn(mut self) -> (UnboundedReceiver>, JoinHandle<()>) { + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(mut self) -> (UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = unbounded_channel(); let jh = tokio::spawn(async move { loop { if let Ok(bundles) = self.check_bundle_cache().await { tracing::debug!(count = ?bundles.len(), "found bundles"); - for bundle in bundles.iter() { - if let Err(err) = outbound.send(Arc::new(bundle.clone())) { - tracing::error!(err = ?err, "Failed to send bundle"); + for bundle in bundles.into_iter() { + if let Err(err) = outbound.send(bundle) { + tracing::error!(err = ?err, "Failed to send bundle - channel is dropped"); } } } - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; } }); (inbound, jh) } } + +impl PartialEq for Bundle { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for Bundle {} From 5cfbdc9fcd2f746ab6da539c71a720d30db7418a Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 19 Mar 2025 14:09:07 -0600 Subject: [PATCH 3/5] fmt --- src/tasks/bundler.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index accd943..1ea5eee 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -43,7 +43,11 @@ impl BundlePoller { } /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. - pub fn new_with_poll_interval_ms(config: &BuilderConfig, authenticator: Authenticator, poll_interval_ms: u64) -> Self { + pub fn new_with_poll_interval_ms( + config: &BuilderConfig, + authenticator: Authenticator, + poll_interval_ms: u64, + ) -> Self { Self { config: config.clone(), authenticator, poll_interval_ms } } From c4522ee516298d1846a99901200776f5afba6021 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 7 Apr 2025 10:01:38 +0200 Subject: [PATCH 4/5] fix: remove eq/partial eq --- src/tasks/bundler.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 1ea5eee..fb10dce 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -89,11 +89,3 @@ impl BundlePoller { (inbound, jh) } } - -impl PartialEq for Bundle { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for Bundle {} From 01a11e64239c1deda93dceb3acfdd64a650cffd7 Mon Sep 17 00:00:00 2001 From: James Prestwich Date: Tue, 8 Apr 2025 15:56:36 -0400 Subject: [PATCH 5/5] fix: various improvements to bundler tasks (#68) --- src/tasks/bundler.rs | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index fb10dce..acc3cae 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -4,8 +4,10 @@ use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; use tokio::task::JoinHandle; +use tokio::time; +use tracing::{Instrument, debug, trace}; use zenith_types::ZenithEthBundle; /// Holds a bundle from the cache with a unique ID and a Zenith bundle. @@ -69,12 +71,24 @@ impl BundlePoller { Ok(resp.bundles) } - /// Spawns a task that sends bundles it finds to its channel sender. - pub fn spawn(mut self) -> (UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = unbounded_channel(); - let jh = tokio::spawn(async move { - loop { - if let Ok(bundles) = self.check_bundle_cache().await { + async fn task_future(mut self, outbound: UnboundedSender) { + loop { + let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_bundle_cache().instrument(span.clone()).await { + Ok(bundles) => { tracing::debug!(count = ?bundles.len(), "found bundles"); for bundle in bundles.into_iter() { if let Err(err) = outbound.send(bundle) { @@ -82,9 +96,21 @@ impl BundlePoller { } } } - tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching bundles"); + } } - }); + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + + let jh = tokio::spawn(self.task_future(outbound)); (inbound, jh) }