From ef6b7346c3ce44f460f5dad4c93103936b2686fa Mon Sep 17 00:00:00 2001 From: dylan Date: Fri, 7 Mar 2025 14:34:55 -0700 Subject: [PATCH 01/14] updates the tx-poller to stream transactions - shifts the tx-poller to a more actor oriented approach by streaming transactions out of the cache. - transaction deduplication and validation is intended to be carried out by the simulator. --- src/tasks/tx_poller.rs | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 4a83577..371cb34 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -1,34 +1,38 @@ +use std::sync::Arc; + use alloy::consensus::TxEnvelope; use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::from_slice; +use tokio::{sync::mpsc, task::JoinHandle}; pub use crate::config::BuilderConfig; -/// Response from the tx-pool endpoint. +/// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolResponse { + /// Holds the transactions property as a list on the response. transactions: Vec, } /// Implements a poller for the block builder to pull transactions from the transaction pool. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TxPoller { - /// config for the builder + /// Config values from the Builder. pub config: BuilderConfig, - /// Reqwest client for fetching transactions from the tx-pool + /// Reqwest Client for fetching transactions from the tx-pool. pub client: Client, } -/// TxPoller implements a poller that fetches unique transactions from the transaction pool. +/// TxPoller implements a poller task that fetches unique transactions from the transaction pool. impl TxPoller { - /// returns a new TxPoller with the given config. + /// Returns a new TxPoller with the given config. pub fn new(config: &BuilderConfig) -> Self { Self { config: config.clone(), client: Client::new() } } - /// polls the tx-pool for unique transactions and evicts expired transactions. + /// Polls the tx-pool for unique transactions and evicts expired transactions. /// unique transactions that haven't been seen before are sent into the builder pipeline. pub async fn check_tx_cache(&mut self) -> Result, Error> { let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?; @@ -36,4 +40,23 @@ impl TxPoller { let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?; Ok(response.transactions) } + + /// Spawns a task that trawls the cache for transactions and sends along anything it finds + pub fn spawn(mut self) -> (mpsc::UnboundedReceiver>, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); + let jh = tokio::spawn(async move { + loop { + if let Ok(transactions) = self.check_tx_cache().await { + tracing::debug!(count = ?transactions.len(), "found transactions"); + for tx in transactions.iter() { + if let Err(err) = outbound.send(Arc::new(tx.clone())) { + tracing::error!(err = ?err, "failed to send transaction outbound"); + } + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + }); + (inbound, jh) + } } From 1ad6e14e914fa478a5f6a583a0f26f66a89baf9c Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 17 Mar 2025 12:11:32 -0600 Subject: [PATCH 02/14] refactors - improves poll interval handling - removes arcs - updates comments - adds file level comment --- src/tasks/tx_poller.rs | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 371cb34..203bad0 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; - +//! Transaction service responsible for fetching and sending trasnsactions to the simulator. +use crate::config::BuilderConfig; use alloy::consensus::TxEnvelope; use eyre::Error; use reqwest::{Client, Url}; @@ -7,8 +7,6 @@ use serde::{Deserialize, Serialize}; use serde_json::from_slice; use tokio::{sync::mpsc, task::JoinHandle}; -pub use crate::config::BuilderConfig; - /// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolResponse { @@ -21,19 +19,26 @@ pub struct TxPoolResponse { pub struct TxPoller { /// Config values from the Builder. pub config: BuilderConfig, - /// Reqwest Client for fetching transactions from the tx-pool. + /// Reqwest Client for fetching transactions from the cache. pub client: Client, + /// Defines the interval at which the service should poll the cache. + pub poll_interval_ms: u64, } -/// TxPoller implements a poller task that fetches unique transactions from the transaction pool. +/// [`TxPoller`] implements a poller task that fetches unique transactions from the transaction pool. impl TxPoller { - /// Returns a new TxPoller with the given config. + /// Returns a new [`TxPoller`] with the given config. + /// * Defaults to 1000ms poll interval (1s). pub fn new(config: &BuilderConfig) -> Self { - Self { config: config.clone(), client: Client::new() } + Self { config: config.clone(), client: Client::new(), poll_interval_ms: 1000 } + } + + /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. + pub fn new_with_poll_interval_ms(config: &BuilderConfig, poll_interval_ms: u64) -> Self { + Self { config: config.clone(), client: Client::new(), poll_interval_ms } } - /// Polls the tx-pool for unique transactions and evicts expired transactions. - /// unique transactions that haven't been seen before are sent into the builder pipeline. + /// Polls the transaction cache for transactions. pub async fn check_tx_cache(&mut self) -> Result, Error> { let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?; let result = self.client.get(url).send().await?; @@ -41,20 +46,20 @@ impl TxPoller { Ok(response.transactions) } - /// Spawns a task that trawls the cache for transactions and sends along anything it finds - pub fn spawn(mut self) -> (mpsc::UnboundedReceiver>, JoinHandle<()>) { + /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. + pub fn spawn(mut self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(async move { loop { if let Ok(transactions) = self.check_tx_cache().await { tracing::debug!(count = ?transactions.len(), "found transactions"); - for tx in transactions.iter() { - if let Err(err) = outbound.send(Arc::new(tx.clone())) { - tracing::error!(err = ?err, "failed to send transaction outbound"); + for tx in transactions.into_iter() { + if let Err(err) = outbound.send(tx) { + tracing::error!(err = ?err, "failed to send transaction - channel is dropped."); } } } - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; } }); (inbound, jh) From b5a5cf4efee4b8450da69bd72d135243e2b09f0a Mon Sep 17 00:00:00 2001 From: James Prestwich Date: Tue, 8 Apr 2025 15:56:17 -0400 Subject: [PATCH 03/14] Fix: various updates to the tx poller (#67) * fix: break loop on closure and improve tracing * refactor: break out the task future --- src/tasks/tx_poller.rs | 51 ++++++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 203bad0..abc9430 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -5,7 +5,8 @@ use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::from_slice; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{sync::mpsc, task::JoinHandle, time}; +use tracing::{Instrument, debug, trace}; /// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -46,22 +47,48 @@ impl TxPoller { Ok(response.transactions) } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. - pub fn spawn(mut self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = mpsc::unbounded_channel(); - let jh = tokio::spawn(async move { - loop { - if let Ok(transactions) = self.check_tx_cache().await { - tracing::debug!(count = ?transactions.len(), "found transactions"); + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + loop { + let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_tx_cache().instrument(span.clone()).await { + Ok(transactions) => { + let _guard = span.entered(); + debug!(count = ?transactions.len(), "found transactions"); for tx in transactions.into_iter() { - if let Err(err) = outbound.send(tx) { - tracing::error!(err = ?err, "failed to send transaction - channel is dropped."); + if outbound.send(tx).is_err() { + // If there are no receivers, we can shut down + trace!("No receivers left, shutting down"); + break; } } } - tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching transactions"); + } } - }); + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); + let jh = tokio::spawn(self.task_future(outbound)); (inbound, jh) } } From 3117b85a11774cd02735b2242bcb6c19e110a421 Mon Sep 17 00:00:00 2001 From: dylan Date: Sun, 16 Mar 2025 19:08:11 -0600 Subject: [PATCH 04/14] updates bundler to streaming actor pattern --- src/tasks/block.rs | 1 - src/tasks/bundler.rs | 83 +++++++++++++++++++++----------------------- 2 files changed, 39 insertions(+), 45 deletions(-) diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 6ba1e8e..e750696 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -180,7 +180,6 @@ impl BlockBuilder { error!(error = %e, "error polling bundles"); } } - self.bundle_poller.evict(); } /// Simulates a Zenith bundle against the rollup state diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 670ecc5..b43e80b 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,4 +1,6 @@ //! Bundler service responsible for managing bundles. +use std::sync::Arc; + use super::oauth::Authenticator; pub use crate::config::BuilderConfig; @@ -6,53 +8,56 @@ pub use crate::config::BuilderConfig; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::time::{Duration, Instant}; +use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; +use tokio::task::JoinHandle; use zenith_types::ZenithEthBundle; -/// A bundle response from the tx-pool endpoint, containing a UUID and a -/// [`ZenithEthBundle`]. +/// Holds a Signet bundle from the cache that has a unique identifier +/// and a Zenith bundle #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { - /// The bundle id (a UUID) + /// Cache identifier for the bundle pub id: String, - /// The bundle itself + /// The Zenith bundle for this bundle pub bundle: ZenithEthBundle, } +impl PartialEq for Bundle { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for Bundle {} + /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolBundleResponse { - /// the list of bundles + /// Bundle responses are availabel on the bundles property pub bundles: Vec, } /// The BundlePoller polls the tx-pool for bundles and manages the seen bundles. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BundlePoller { - /// Configuration + /// The builder configuration values. pub config: BuilderConfig, - /// [`Authenticator`] for fetching OAuth tokens + /// Authentication module that periodically fetches and stores auth tokens. pub authenticator: Authenticator, - /// Already seen bundle UUIDs - pub seen_uuids: HashMap, } /// Implements a poller for the block builder to pull bundles from the tx cache. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() } + Self { config: config.clone(), authenticator } } /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache. pub async fn check_bundle_cache(&mut self) -> eyre::Result> { - let mut unique: Vec = Vec::new(); - let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let token = self.authenticator.fetch_oauth_token().await?; - // Add the token to the request headers let result = reqwest::Client::new() .get(bundle_url) .bearer_auth(token.access_token().secret()) @@ -61,38 +66,28 @@ impl BundlePoller { .error_for_status()?; let body = result.bytes().await?; - let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?; - - bundles.bundles.iter().for_each(|bundle| { - self.check_seen_bundles(bundle.clone(), &mut unique); - }); + let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?; - Ok(unique) + Ok(resp.bundles) } - /// Checks if the bundle has been seen before and if not, adds it to the unique bundles list. - fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec) { - self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| { - // add to the set of unique bundles - unique.push(bundle.clone()); - Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration) + /// Spawns a task that simply sends out any bundles it ever finds + pub fn spawn(mut self) -> (UnboundedReceiver>, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + let jh = tokio::spawn(async move { + loop { + if let Ok(bundles) = self.check_bundle_cache().await { + tracing::debug!(count = ?bundles.len(), "found bundles"); + for bundle in bundles.iter() { + if let Err(err) = outbound.send(Arc::new(bundle.clone())) { + tracing::error!(err = ?err, "Failed to send bundle"); + } + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } }); - } - - /// Evicts expired bundles from the cache. - pub fn evict(&mut self) { - let expired_keys: Vec = self - .seen_uuids - .iter() - .filter_map( - |(key, expiry)| { - if expiry.elapsed().is_zero() { Some(key.clone()) } else { None } - }, - ) - .collect(); - for key in expired_keys { - self.seen_uuids.remove(&key); - } + (inbound, jh) } } From 96dce0425f1ba1a96e7090ec7aec82f571966657 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 17 Mar 2025 12:20:44 -0600 Subject: [PATCH 05/14] refactors - improves poll interval handling - cleans up and updates comments - removes arc usage --- src/tasks/bundler.rs | 56 +++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index b43e80b..accd943 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,10 +1,6 @@ -//! Bundler service responsible for managing bundles. -use std::sync::Arc; - -use super::oauth::Authenticator; - +//! Bundler service responsible for fetching bundles and sending them to the simulator. pub use crate::config::BuilderConfig; - +use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -12,8 +8,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; use tokio::task::JoinHandle; use zenith_types::ZenithEthBundle; -/// Holds a Signet bundle from the cache that has a unique identifier -/// and a Zenith bundle +/// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { /// Cache identifier for the bundle @@ -22,38 +17,37 @@ pub struct Bundle { pub bundle: ZenithEthBundle, } -impl PartialEq for Bundle { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for Bundle {} - /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolBundleResponse { - /// Bundle responses are availabel on the bundles property + /// Bundle responses are available on the bundles property. pub bundles: Vec, } -/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles. +/// The BundlePoller polls the tx-pool for bundles. #[derive(Debug, Clone)] pub struct BundlePoller { /// The builder configuration values. pub config: BuilderConfig, /// Authentication module that periodically fetches and stores auth tokens. pub authenticator: Authenticator, + /// Defines the interval at which the bundler polls the tx-pool for bundles. + pub poll_interval_ms: u64, } -/// Implements a poller for the block builder to pull bundles from the tx cache. +/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { config: config.clone(), authenticator } + Self { config: config.clone(), authenticator, poll_interval_ms: 1000 } } - /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache. + /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. + pub fn new_with_poll_interval_ms(config: &BuilderConfig, authenticator: Authenticator, poll_interval_ms: u64) -> Self { + Self { config: config.clone(), authenticator, poll_interval_ms } + } + + /// Fetches bundles from the transaction cache and returns them. pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let token = self.authenticator.fetch_oauth_token().await?; @@ -71,23 +65,31 @@ impl BundlePoller { Ok(resp.bundles) } - /// Spawns a task that simply sends out any bundles it ever finds - pub fn spawn(mut self) -> (UnboundedReceiver>, JoinHandle<()>) { + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(mut self) -> (UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = unbounded_channel(); let jh = tokio::spawn(async move { loop { if let Ok(bundles) = self.check_bundle_cache().await { tracing::debug!(count = ?bundles.len(), "found bundles"); - for bundle in bundles.iter() { - if let Err(err) = outbound.send(Arc::new(bundle.clone())) { - tracing::error!(err = ?err, "Failed to send bundle"); + for bundle in bundles.into_iter() { + if let Err(err) = outbound.send(bundle) { + tracing::error!(err = ?err, "Failed to send bundle - channel is dropped"); } } } - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; } }); (inbound, jh) } } + +impl PartialEq for Bundle { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for Bundle {} From 5cfbdc9fcd2f746ab6da539c71a720d30db7418a Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 19 Mar 2025 14:09:07 -0600 Subject: [PATCH 06/14] fmt --- src/tasks/bundler.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index accd943..1ea5eee 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -43,7 +43,11 @@ impl BundlePoller { } /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. - pub fn new_with_poll_interval_ms(config: &BuilderConfig, authenticator: Authenticator, poll_interval_ms: u64) -> Self { + pub fn new_with_poll_interval_ms( + config: &BuilderConfig, + authenticator: Authenticator, + poll_interval_ms: u64, + ) -> Self { Self { config: config.clone(), authenticator, poll_interval_ms } } From c4522ee516298d1846a99901200776f5afba6021 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 7 Apr 2025 10:01:38 +0200 Subject: [PATCH 07/14] fix: remove eq/partial eq --- src/tasks/bundler.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 1ea5eee..fb10dce 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -89,11 +89,3 @@ impl BundlePoller { (inbound, jh) } } - -impl PartialEq for Bundle { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for Bundle {} From 01a11e64239c1deda93dceb3acfdd64a650cffd7 Mon Sep 17 00:00:00 2001 From: James Prestwich Date: Tue, 8 Apr 2025 15:56:36 -0400 Subject: [PATCH 08/14] fix: various improvements to bundler tasks (#68) --- src/tasks/bundler.rs | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index fb10dce..acc3cae 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -4,8 +4,10 @@ use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; use tokio::task::JoinHandle; +use tokio::time; +use tracing::{Instrument, debug, trace}; use zenith_types::ZenithEthBundle; /// Holds a bundle from the cache with a unique ID and a Zenith bundle. @@ -69,12 +71,24 @@ impl BundlePoller { Ok(resp.bundles) } - /// Spawns a task that sends bundles it finds to its channel sender. - pub fn spawn(mut self) -> (UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = unbounded_channel(); - let jh = tokio::spawn(async move { - loop { - if let Ok(bundles) = self.check_bundle_cache().await { + async fn task_future(mut self, outbound: UnboundedSender) { + loop { + let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_bundle_cache().instrument(span.clone()).await { + Ok(bundles) => { tracing::debug!(count = ?bundles.len(), "found bundles"); for bundle in bundles.into_iter() { if let Err(err) = outbound.send(bundle) { @@ -82,9 +96,21 @@ impl BundlePoller { } } } - tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await; + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching bundles"); + } } - }); + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + + let jh = tokio::spawn(self.task_future(outbound)); (inbound, jh) } From 63742382d6f0ed487cd44c7ec21bcc9d74aa8b74 Mon Sep 17 00:00:00 2001 From: dylan Date: Sun, 16 Mar 2025 19:13:52 -0600 Subject: [PATCH 09/14] chore: update to alloy@0.11 - adds init4 metrics - updates provider types to account for alloy changes --- Cargo.toml | 6 ++++-- bin/submit_transaction.rs | 1 - src/config.rs | 45 ++++++++++++++++++++------------------- src/lib.rs | 3 +++ 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e4e8899..70b41b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,9 +24,9 @@ path = "bin/submit_transaction.rs" [dependencies] init4-bin-base = "0.1.0" -zenith-types = "0.13" +zenith-types = "0.15" -alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp"] } +alloy = { version = "=0.11.1", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] } aws-config = "1.1.7" aws-sdk-kms = "1.15.0" @@ -48,3 +48,5 @@ tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] } async-trait = "0.1.80" oauth2 = "4.4.2" +metrics = "0.24.1" +metrics-exporter-prometheus = "0.16.0" diff --git a/bin/submit_transaction.rs b/bin/submit_transaction.rs index 3955a49..708a37b 100644 --- a/bin/submit_transaction.rs +++ b/bin/submit_transaction.rs @@ -82,7 +82,6 @@ async fn connect_from_config() -> (Provider, Address, u64) { let signer = AwsSigner::new(client, kms_key_id.to_string(), Some(chain_id)).await.unwrap(); let provider = ProviderBuilder::new() - .with_recommended_fillers() .wallet(EthereumWallet::from(signer)) .on_builtin(&rpc_url) .await diff --git a/src/config.rs b/src/config.rs index 1e8a6fa..cb209ea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,9 +9,9 @@ use alloy::{ WalletFiller, }, }, - transports::BoxTransport, }; use std::{borrow::Cow, env, num, str::FromStr}; + use zenith_types::Zenith; // Keys for .env variables that need to be set to configure the builder. @@ -125,7 +125,7 @@ impl ConfigError { } } -/// Provider type used to read & write. +/// Defines a full provider pub type Provider = FillProvider< JoinFill< JoinFill< @@ -134,26 +134,22 @@ pub type Provider = FillProvider< >, WalletFiller, >, - RootProvider, - BoxTransport, + RootProvider, Ethereum, >; -/// Provider type used to read-only. +/// Defines a read-only wallet pub type WalletlessProvider = FillProvider< JoinFill< Identity, JoinFill>>, >, - RootProvider, - BoxTransport, + RootProvider, Ethereum, >; -/// A Zenith contract instance, using some provider `P` (defaults to -/// [`Provider`]). -pub type ZenithInstance

= - Zenith::ZenithInstance; +/// Defines a [`Zenith`] instance that is generic over [`Provider`] +pub type ZenithInstance = Zenith::ZenithInstance<(), Provider, alloy::network::Ethereum>; impl BuilderConfig { /// Load the builder configuration from environment variables. @@ -210,32 +206,36 @@ impl BuilderConfig { /// Connect to the Rollup rpc provider. pub async fn connect_ru_provider(&self) -> Result { - ProviderBuilder::new() - .with_recommended_fillers() + let provider = ProviderBuilder::new() .on_builtin(&self.ru_rpc_url) .await - .map_err(Into::into) + .map_err(ConfigError::Provider)?; + + Ok(provider) } /// Connect to the Host rpc provider. pub async fn connect_host_provider(&self) -> Result { let builder_signer = self.connect_builder_signer().await?; - ProviderBuilder::new() - .with_recommended_fillers() + let provider = ProviderBuilder::new() .wallet(EthereumWallet::from(builder_signer)) .on_builtin(&self.host_rpc_url) .await - .map_err(Into::into) + .map_err(ConfigError::Provider)?; + + Ok(provider) } - /// Connect additional broadcast providers. + /// Connect additionally configured non-host providers to broadcast transactions to. pub async fn connect_additional_broadcast( &self, - ) -> Result>, ConfigError> { - let mut providers = Vec::with_capacity(self.tx_broadcast_urls.len()); + ) -> Result, ConfigError> { + let mut providers: Vec = + Vec::with_capacity(self.tx_broadcast_urls.len()); for url in self.tx_broadcast_urls.iter() { let provider = - ProviderBuilder::new().on_builtin(url).await.map_err(Into::::into)?; + ProviderBuilder::new().on_builtin(url).await.map_err(ConfigError::Provider)?; + providers.push(provider); } Ok(providers) @@ -278,5 +278,6 @@ pub fn load_url(key: &str) -> Result, ConfigError> { /// Load an address from an environment variable. pub fn load_address(key: &str) -> Result { let address = load_string(key)?; - Address::from_str(&address).map_err(Into::into) + Address::from_str(&address) + .map_err(|_| ConfigError::Var(format!("Invalid address format for {}", key))) } diff --git a/src/lib.rs b/src/lib.rs index 5d4e743..562d3c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,4 +27,7 @@ pub mod tasks; /// Utilities. pub mod utils; +/// Anonymous crate dependency imports. +use metrics as _; +use metrics_exporter_prometheus as _; use openssl as _; From b1324b0505daca1b8ffad81d36663e8529dc0224 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 17 Mar 2025 12:36:59 -0600 Subject: [PATCH 10/14] refactors to account for init4 bin base in builder craate --- Cargo.toml | 2 -- src/config.rs | 8 ++++---- src/lib.rs | 2 -- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 70b41b2..d1b0f34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,5 +48,3 @@ tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] } async-trait = "0.1.80" oauth2 = "4.4.2" -metrics = "0.24.1" -metrics-exporter-prometheus = "0.16.0" diff --git a/src/config.rs b/src/config.rs index cb209ea..5fe6d36 100644 --- a/src/config.rs +++ b/src/config.rs @@ -125,7 +125,7 @@ impl ConfigError { } } -/// Defines a full provider +/// Defines a full provider. pub type Provider = FillProvider< JoinFill< JoinFill< @@ -138,7 +138,7 @@ pub type Provider = FillProvider< Ethereum, >; -/// Defines a read-only wallet +/// Defines a provider type used to read-only. pub type WalletlessProvider = FillProvider< JoinFill< Identity, @@ -149,7 +149,7 @@ pub type WalletlessProvider = FillProvider< >; /// Defines a [`Zenith`] instance that is generic over [`Provider`] -pub type ZenithInstance = Zenith::ZenithInstance<(), Provider, alloy::network::Ethereum>; +pub type ZenithInstance

= Zenith::ZenithInstance<(), P, alloy::network::Ethereum>; impl BuilderConfig { /// Load the builder configuration from environment variables. @@ -226,7 +226,7 @@ impl BuilderConfig { Ok(provider) } - /// Connect additionally configured non-host providers to broadcast transactions to. + /// Connect additional broadcast providers. pub async fn connect_additional_broadcast( &self, ) -> Result, ConfigError> { diff --git a/src/lib.rs b/src/lib.rs index 562d3c2..3ab4b90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,4 @@ pub mod tasks; pub mod utils; /// Anonymous crate dependency imports. -use metrics as _; -use metrics_exporter_prometheus as _; use openssl as _; From 976dd03c328feb330a159883c860a3fccecf282a Mon Sep 17 00:00:00 2001 From: dylan Date: Thu, 27 Mar 2025 12:58:32 -0600 Subject: [PATCH 11/14] use signet-sdk - align to alloy @ 0.12.6 - transfer off of zenith-rs and use signet-sdk instead --- Cargo.toml | 20 +++++++++++++++----- bin/submit_transaction.rs | 2 +- src/config.rs | 8 ++++---- src/tasks/block.rs | 11 ++++++----- src/tasks/bundler.rs | 2 +- src/tasks/submit.rs | 16 ++++++++-------- 6 files changed, 35 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d1b0f34..8d7a7a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,11 +22,21 @@ name = "transaction-submitter" path = "bin/submit_transaction.rs" [dependencies] -init4-bin-base = "0.1.0" - -zenith-types = "0.15" - -alloy = { version = "=0.11.1", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] } +init4-bin-base = { path = "../bin-base" } + +signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "prestwich/update-trevm" } +signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "prestwich/update-trevm" } +signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "prestwich/update-trevm" } + +alloy = { version = "0.12.6", features = [ + "full", + "json-rpc", + "signer-aws", + "rpc-types-mev", + "rlp", + "node-bindings", + "serde", +] } aws-config = "1.1.7" aws-sdk-kms = "1.15.0" diff --git a/bin/submit_transaction.rs b/bin/submit_transaction.rs index 708a37b..addb205 100644 --- a/bin/submit_transaction.rs +++ b/bin/submit_transaction.rs @@ -83,7 +83,7 @@ async fn connect_from_config() -> (Provider, Address, u64) { let provider = ProviderBuilder::new() .wallet(EthereumWallet::from(signer)) - .on_builtin(&rpc_url) + .connect(&rpc_url) .await .unwrap(); diff --git a/src/config.rs b/src/config.rs index 5fe6d36..6864b84 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,7 +12,7 @@ use alloy::{ }; use std::{borrow::Cow, env, num, str::FromStr}; -use zenith_types::Zenith; +use signet_zenith::Zenith; // Keys for .env variables that need to be set to configure the builder. const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID"; @@ -207,7 +207,7 @@ impl BuilderConfig { /// Connect to the Rollup rpc provider. pub async fn connect_ru_provider(&self) -> Result { let provider = ProviderBuilder::new() - .on_builtin(&self.ru_rpc_url) + .connect(&self.ru_rpc_url) .await .map_err(ConfigError::Provider)?; @@ -219,7 +219,7 @@ impl BuilderConfig { let builder_signer = self.connect_builder_signer().await?; let provider = ProviderBuilder::new() .wallet(EthereumWallet::from(builder_signer)) - .on_builtin(&self.host_rpc_url) + .connect(&self.host_rpc_url) .await .map_err(ConfigError::Provider)?; @@ -234,7 +234,7 @@ impl BuilderConfig { Vec::with_capacity(self.tx_broadcast_urls.len()); for url in self.tx_broadcast_urls.iter() { let provider = - ProviderBuilder::new().on_builtin(url).await.map_err(ConfigError::Provider)?; + ProviderBuilder::new().connect(url).await.map_err(ConfigError::Provider)?; providers.push(provider); } diff --git a/src/tasks/block.rs b/src/tasks/block.rs index e750696..dd1a050 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -3,7 +3,7 @@ use super::oauth::Authenticator; use super::tx_poller::TxPoller; use crate::config::{BuilderConfig, WalletlessProvider}; use alloy::{ - consensus::{SidecarBuilder, SidecarCoder, TxEnvelope}, + consensus::{SidecarBuilder, SidecarCoder, transaction::TxEnvelope}, eips::eip2718::Decodable2718, primitives::{B256, Bytes, keccak256}, providers::Provider as _, @@ -13,7 +13,8 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{sync::OnceLock, time::Duration}; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{Instrument, debug, error, info, trace}; -use zenith_types::{Alloy2718Coder, ZenithEthBundle, encode_txns}; +use signet_zenith::{Alloy2718Coder, encode_txns}; +use signet_bundle::SignetEthBundle; /// Ethereum's slot time in seconds. pub const ETHEREUM_SLOT_TIME: u64 = 12; @@ -183,7 +184,7 @@ impl BlockBuilder { } /// Simulates a Zenith bundle against the rollup state - async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> { + async fn simulate_bundle(&mut self, bundle: &SignetEthBundle) -> eyre::Result<()> { // TODO: Simulate bundles with the Simulation Engine // [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles) debug!(hash = ?bundle.bundle.bundle_hash(), block_number = ?bundle.block_number(), "bundle simulations is not implemented yet - skipping simulation"); @@ -269,7 +270,7 @@ mod tests { rpc::types::{TransactionRequest, mev::EthSendBundle}, signers::local::PrivateKeySigner, }; - use zenith_types::ZenithEthBundle; + use signet_bundle::SignetEthBundle; /// Create a mock bundle for testing with a single transaction async fn create_mock_bundle(wallet: &EthereumWallet) -> Bundle { @@ -294,7 +295,7 @@ mod tests { replacement_uuid: Some("replacement_uuid".to_owned()), }; - let zenith_bundle = ZenithEthBundle { bundle: eth_bundle, host_fills: None }; + let zenith_bundle = SignetEthBundle { bundle: eth_bundle, host_fills: None }; Bundle { id: "mock_bundle".to_owned(), bundle: zenith_bundle } } diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index acc3cae..753ddb3 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -16,7 +16,7 @@ pub struct Bundle { /// Cache identifier for the bundle pub id: String, /// The Zenith bundle for this bundle - pub bundle: ZenithEthBundle, + pub bundle: SignetEthBundle, } /// Response from the tx-pool containing a list of bundles. diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 0f1b0bf..558167f 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -18,14 +18,14 @@ use alloy::{ use eyre::{bail, eyre}; use init4_bin_base::deps::metrics::{counter, histogram}; use oauth2::TokenResponse; +use signet_types::{SignRequest, SignResponse}; +use signet_zenith::{ + BundleHelper::{self, BlockHeader, FillPermit2, submitCall}, + Zenith::IncorrectHostBlock, +}; use std::time::Instant; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{debug, error, instrument, trace}; -use zenith_types::{ - BundleHelper::{self, FillPermit2}, - SignRequest, SignResponse, - Zenith::IncorrectHostBlock, -}; macro_rules! spawn_provider_send { ($provider:expr, $tx:expr) => { @@ -127,7 +127,7 @@ impl SubmitTask { s: FixedBytes<32>, in_progress: &InProgressBlock, ) -> eyre::Result { - let data = zenith_types::BundleHelper::submitCall { fills, header, v, r, s }.abi_encode(); + let data = submitCall { fills, header, v, r, s }.abi_encode(); let sidecar = in_progress.encode_blob::().build()?; Ok(TransactionRequest::default() @@ -151,7 +151,7 @@ impl SubmitTask { ) -> eyre::Result { let (v, r, s) = extract_signature_components(&resp.sig); - let header = zenith_types::BundleHelper::BlockHeader { + let header = BlockHeader { hostBlockNumber: resp.req.host_block_number, rollupChainId: U256::from(self.config.ru_chain_id), gasLimit: resp.req.gas_limit, @@ -167,7 +167,7 @@ impl SubmitTask { .with_gas_limit(1_000_000); if let Err(TransportError::ErrorResp(e)) = - self.host_provider.call(&tx).block(BlockNumberOrTag::Pending.into()).await + self.host_provider.call(tx.clone()).block(BlockNumberOrTag::Pending.into()).await { error!( code = e.code, From b22999627ecc2641485ed791b9036a1001311996 Mon Sep 17 00:00:00 2001 From: dylan Date: Thu, 27 Mar 2025 15:18:21 -0600 Subject: [PATCH 12/14] fix: update bin-base import --- Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d7a7a9..b9231f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,11 +22,11 @@ name = "transaction-submitter" path = "bin/submit_transaction.rs" [dependencies] -init4-bin-base = { path = "../bin-base" } +init4-bin-base = { git = "https://github.com/init4tech/bin-base.git" } -signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "prestwich/update-trevm" } -signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "prestwich/update-trevm" } -signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "prestwich/update-trevm" } +signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } alloy = { version = "0.12.6", features = [ "full", From bafe07f43132742c1f5074f942fb0779617bffa6 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 7 Apr 2025 13:08:01 +0200 Subject: [PATCH 13/14] nits: doc updates --- src/config.rs | 9 ++++----- src/lib.rs | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/config.rs b/src/config.rs index 6864b84..705a567 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,9 +10,8 @@ use alloy::{ }, }, }; -use std::{borrow::Cow, env, num, str::FromStr}; - use signet_zenith::Zenith; +use std::{borrow::Cow, env, num, str::FromStr}; // Keys for .env variables that need to be set to configure the builder. const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID"; @@ -125,7 +124,7 @@ impl ConfigError { } } -/// Defines a full provider. +/// Type alias for the provider used in the builder. pub type Provider = FillProvider< JoinFill< JoinFill< @@ -138,7 +137,7 @@ pub type Provider = FillProvider< Ethereum, >; -/// Defines a provider type used to read-only. +/// Type alias for the provider used in the builder, without a wallet. pub type WalletlessProvider = FillProvider< JoinFill< Identity, @@ -148,7 +147,7 @@ pub type WalletlessProvider = FillProvider< Ethereum, >; -/// Defines a [`Zenith`] instance that is generic over [`Provider`] +/// A [`Zenith`] contract instance using [`Provider`] as the provider. pub type ZenithInstance

= Zenith::ZenithInstance<(), P, alloy::network::Ethereum>; impl BuilderConfig { diff --git a/src/lib.rs b/src/lib.rs index 3ab4b90..10eefeb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,5 +27,5 @@ pub mod tasks; /// Utilities. pub mod utils; -/// Anonymous crate dependency imports. +// Anonymous import suppresses warnings about unused imports. use openssl as _; From e0c79f7552918e8f302af071b9dfd654b19f681c Mon Sep 17 00:00:00 2001 From: James Date: Mon, 7 Apr 2025 13:29:58 +0200 Subject: [PATCH 14/14] lint: fmt --- src/tasks/block.rs | 4 ++-- src/tasks/bundler.rs | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/tasks/block.rs b/src/tasks/block.rs index dd1a050..a0c9c87 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -9,12 +9,12 @@ use alloy::{ providers::Provider as _, rlp::Buf, }; +use signet_bundle::SignetEthBundle; +use signet_zenith::{Alloy2718Coder, encode_txns}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{sync::OnceLock, time::Duration}; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{Instrument, debug, error, info, trace}; -use signet_zenith::{Alloy2718Coder, encode_txns}; -use signet_bundle::SignetEthBundle; /// Ethereum's slot time in seconds. pub const ETHEREUM_SLOT_TIME: u64 = 12; diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 753ddb3..d3ec34e 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -8,8 +8,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; use tokio::task::JoinHandle; use tokio::time; use tracing::{Instrument, debug, trace}; -use zenith_types::ZenithEthBundle; - +use signet_bundle::SignetEthBundle; /// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle {