diff --git a/Cargo.toml b/Cargo.toml index e4e8899..b9231f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,11 +22,21 @@ name = "transaction-submitter" path = "bin/submit_transaction.rs" [dependencies] -init4-bin-base = "0.1.0" - -zenith-types = "0.13" - -alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp"] } +init4-bin-base = { git = "https://github.com/init4tech/bin-base.git" } + +signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } + +alloy = { version = "0.12.6", features = [ + "full", + "json-rpc", + "signer-aws", + "rpc-types-mev", + "rlp", + "node-bindings", + "serde", +] } aws-config = "1.1.7" aws-sdk-kms = "1.15.0" diff --git a/bin/submit_transaction.rs b/bin/submit_transaction.rs index 3955a49..addb205 100644 --- a/bin/submit_transaction.rs +++ b/bin/submit_transaction.rs @@ -82,9 +82,8 @@ async fn connect_from_config() -> (Provider, Address, u64) { let signer = AwsSigner::new(client, kms_key_id.to_string(), Some(chain_id)).await.unwrap(); let provider = ProviderBuilder::new() - .with_recommended_fillers() .wallet(EthereumWallet::from(signer)) - .on_builtin(&rpc_url) + .connect(&rpc_url) .await .unwrap(); diff --git a/src/config.rs b/src/config.rs index 1e8a6fa..705a567 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,10 +9,9 @@ use alloy::{ WalletFiller, }, }, - transports::BoxTransport, }; +use signet_zenith::Zenith; use std::{borrow::Cow, env, num, str::FromStr}; -use zenith_types::Zenith; // Keys for .env variables that need to be set to configure the builder. const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID"; @@ -125,7 +124,7 @@ impl ConfigError { } } -/// Provider type used to read & write. +/// Type alias for the provider used in the builder. pub type Provider = FillProvider< JoinFill< JoinFill< @@ -134,26 +133,22 @@ pub type Provider = FillProvider< >, WalletFiller, >, - RootProvider, - BoxTransport, + RootProvider, Ethereum, >; -/// Provider type used to read-only. +/// Type alias for the provider used in the builder, without a wallet. pub type WalletlessProvider = FillProvider< JoinFill< Identity, JoinFill>>, >, - RootProvider, - BoxTransport, + RootProvider, Ethereum, >; -/// A Zenith contract instance, using some provider `P` (defaults to -/// [`Provider`]). -pub type ZenithInstance

= - Zenith::ZenithInstance; +/// A [`Zenith`] contract instance using [`Provider`] as the provider. +pub type ZenithInstance

= Zenith::ZenithInstance<(), P, alloy::network::Ethereum>; impl BuilderConfig { /// Load the builder configuration from environment variables. @@ -210,32 +205,36 @@ impl BuilderConfig { /// Connect to the Rollup rpc provider. pub async fn connect_ru_provider(&self) -> Result { - ProviderBuilder::new() - .with_recommended_fillers() - .on_builtin(&self.ru_rpc_url) + let provider = ProviderBuilder::new() + .connect(&self.ru_rpc_url) .await - .map_err(Into::into) + .map_err(ConfigError::Provider)?; + + Ok(provider) } /// Connect to the Host rpc provider. pub async fn connect_host_provider(&self) -> Result { let builder_signer = self.connect_builder_signer().await?; - ProviderBuilder::new() - .with_recommended_fillers() + let provider = ProviderBuilder::new() .wallet(EthereumWallet::from(builder_signer)) - .on_builtin(&self.host_rpc_url) + .connect(&self.host_rpc_url) .await - .map_err(Into::into) + .map_err(ConfigError::Provider)?; + + Ok(provider) } /// Connect additional broadcast providers. pub async fn connect_additional_broadcast( &self, - ) -> Result>, ConfigError> { - let mut providers = Vec::with_capacity(self.tx_broadcast_urls.len()); + ) -> Result, ConfigError> { + let mut providers: Vec = + Vec::with_capacity(self.tx_broadcast_urls.len()); for url in self.tx_broadcast_urls.iter() { let provider = - ProviderBuilder::new().on_builtin(url).await.map_err(Into::::into)?; + ProviderBuilder::new().connect(url).await.map_err(ConfigError::Provider)?; + providers.push(provider); } Ok(providers) @@ -278,5 +277,6 @@ pub fn load_url(key: &str) -> Result, ConfigError> { /// Load an address from an environment variable. pub fn load_address(key: &str) -> Result { let address = load_string(key)?; - Address::from_str(&address).map_err(Into::into) + Address::from_str(&address) + .map_err(|_| ConfigError::Var(format!("Invalid address format for {}", key))) } diff --git a/src/lib.rs b/src/lib.rs index 5d4e743..10eefeb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,4 +27,5 @@ pub mod tasks; /// Utilities. pub mod utils; +// Anonymous import suppresses warnings about unused imports. use openssl as _; diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 6ba1e8e..a0c9c87 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -3,17 +3,18 @@ use super::oauth::Authenticator; use super::tx_poller::TxPoller; use crate::config::{BuilderConfig, WalletlessProvider}; use alloy::{ - consensus::{SidecarBuilder, SidecarCoder, TxEnvelope}, + consensus::{SidecarBuilder, SidecarCoder, transaction::TxEnvelope}, eips::eip2718::Decodable2718, primitives::{B256, Bytes, keccak256}, providers::Provider as _, rlp::Buf, }; +use signet_bundle::SignetEthBundle; +use signet_zenith::{Alloy2718Coder, encode_txns}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{sync::OnceLock, time::Duration}; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{Instrument, debug, error, info, trace}; -use zenith_types::{Alloy2718Coder, ZenithEthBundle, encode_txns}; /// Ethereum's slot time in seconds. pub const ETHEREUM_SLOT_TIME: u64 = 12; @@ -180,11 +181,10 @@ impl BlockBuilder { error!(error = %e, "error polling bundles"); } } - self.bundle_poller.evict(); } /// Simulates a Zenith bundle against the rollup state - async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> { + async fn simulate_bundle(&mut self, bundle: &SignetEthBundle) -> eyre::Result<()> { // TODO: Simulate bundles with the Simulation Engine // [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles) debug!(hash = ?bundle.bundle.bundle_hash(), block_number = ?bundle.block_number(), "bundle simulations is not implemented yet - skipping simulation"); @@ -270,7 +270,7 @@ mod tests { rpc::types::{TransactionRequest, mev::EthSendBundle}, signers::local::PrivateKeySigner, }; - use zenith_types::ZenithEthBundle; + use signet_bundle::SignetEthBundle; /// Create a mock bundle for testing with a single transaction async fn create_mock_bundle(wallet: &EthereumWallet) -> Bundle { @@ -295,7 +295,7 @@ mod tests { replacement_uuid: Some("replacement_uuid".to_owned()), }; - let zenith_bundle = ZenithEthBundle { bundle: eth_bundle, host_fills: None }; + let zenith_bundle = SignetEthBundle { bundle: eth_bundle, host_fills: None }; Bundle { id: "mock_bundle".to_owned(), bundle: zenith_bundle } } diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 670ecc5..d3ec34e 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,58 +1,62 @@ -//! Bundler service responsible for managing bundles. -use super::oauth::Authenticator; - +//! Bundler service responsible for fetching bundles and sending them to the simulator. pub use crate::config::BuilderConfig; - +use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::time::{Duration, Instant}; -use zenith_types::ZenithEthBundle; - -/// A bundle response from the tx-pool endpoint, containing a UUID and a -/// [`ZenithEthBundle`]. +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; +use tokio::task::JoinHandle; +use tokio::time; +use tracing::{Instrument, debug, trace}; +use signet_bundle::SignetEthBundle; +/// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { - /// The bundle id (a UUID) + /// Cache identifier for the bundle pub id: String, - /// The bundle itself - pub bundle: ZenithEthBundle, + /// The Zenith bundle for this bundle + pub bundle: SignetEthBundle, } /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolBundleResponse { - /// the list of bundles + /// Bundle responses are available on the bundles property. pub bundles: Vec, } -/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles. -#[derive(Debug)] +/// The BundlePoller polls the tx-pool for bundles. +#[derive(Debug, Clone)] pub struct BundlePoller { - /// Configuration + /// The builder configuration values. pub config: BuilderConfig, - /// [`Authenticator`] for fetching OAuth tokens + /// Authentication module that periodically fetches and stores auth tokens. pub authenticator: Authenticator, - /// Already seen bundle UUIDs - pub seen_uuids: HashMap, + /// Defines the interval at which the bundler polls the tx-pool for bundles. + pub poll_interval_ms: u64, } -/// Implements a poller for the block builder to pull bundles from the tx cache. +/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() } + Self { config: config.clone(), authenticator, poll_interval_ms: 1000 } } - /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache. - pub async fn check_bundle_cache(&mut self) -> eyre::Result> { - let mut unique: Vec = Vec::new(); + /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. + pub fn new_with_poll_interval_ms( + config: &BuilderConfig, + authenticator: Authenticator, + poll_interval_ms: u64, + ) -> Self { + Self { config: config.clone(), authenticator, poll_interval_ms } + } + /// Fetches bundles from the transaction cache and returns them. + pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let token = self.authenticator.fetch_oauth_token().await?; - // Add the token to the request headers let result = reqwest::Client::new() .get(bundle_url) .bearer_auth(token.access_token().secret()) @@ -61,38 +65,52 @@ impl BundlePoller { .error_for_status()?; let body = result.bytes().await?; - let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?; - - bundles.bundles.iter().for_each(|bundle| { - self.check_seen_bundles(bundle.clone(), &mut unique); - }); + let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?; - Ok(unique) + Ok(resp.bundles) } - /// Checks if the bundle has been seen before and if not, adds it to the unique bundles list. - fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec) { - self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| { - // add to the set of unique bundles - unique.push(bundle.clone()); - Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration) - }); - } + async fn task_future(mut self, outbound: UnboundedSender) { + loop { + let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); - /// Evicts expired bundles from the cache. - pub fn evict(&mut self) { - let expired_keys: Vec = self - .seen_uuids - .iter() - .filter_map( - |(key, expiry)| { - if expiry.elapsed().is_zero() { Some(key.clone()) } else { None } - }, - ) - .collect(); + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); - for key in expired_keys { - self.seen_uuids.remove(&key); + match self.check_bundle_cache().instrument(span.clone()).await { + Ok(bundles) => { + tracing::debug!(count = ?bundles.len(), "found bundles"); + for bundle in bundles.into_iter() { + if let Err(err) = outbound.send(bundle) { + tracing::error!(err = ?err, "Failed to send bundle - channel is dropped"); + } + } + } + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching bundles"); + } + } + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; } } + + /// Spawns a task that sends bundles it finds to its channel sender. + pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = unbounded_channel(); + + let jh = tokio::spawn(self.task_future(outbound)); + + (inbound, jh) + } } diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 0f1b0bf..558167f 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -18,14 +18,14 @@ use alloy::{ use eyre::{bail, eyre}; use init4_bin_base::deps::metrics::{counter, histogram}; use oauth2::TokenResponse; +use signet_types::{SignRequest, SignResponse}; +use signet_zenith::{ + BundleHelper::{self, BlockHeader, FillPermit2, submitCall}, + Zenith::IncorrectHostBlock, +}; use std::time::Instant; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{debug, error, instrument, trace}; -use zenith_types::{ - BundleHelper::{self, FillPermit2}, - SignRequest, SignResponse, - Zenith::IncorrectHostBlock, -}; macro_rules! spawn_provider_send { ($provider:expr, $tx:expr) => { @@ -127,7 +127,7 @@ impl SubmitTask { s: FixedBytes<32>, in_progress: &InProgressBlock, ) -> eyre::Result { - let data = zenith_types::BundleHelper::submitCall { fills, header, v, r, s }.abi_encode(); + let data = submitCall { fills, header, v, r, s }.abi_encode(); let sidecar = in_progress.encode_blob::().build()?; Ok(TransactionRequest::default() @@ -151,7 +151,7 @@ impl SubmitTask { ) -> eyre::Result { let (v, r, s) = extract_signature_components(&resp.sig); - let header = zenith_types::BundleHelper::BlockHeader { + let header = BlockHeader { hostBlockNumber: resp.req.host_block_number, rollupChainId: U256::from(self.config.ru_chain_id), gasLimit: resp.req.gas_limit, @@ -167,7 +167,7 @@ impl SubmitTask { .with_gas_limit(1_000_000); if let Err(TransportError::ErrorResp(e)) = - self.host_provider.call(&tx).block(BlockNumberOrTag::Pending.into()).await + self.host_provider.call(tx.clone()).block(BlockNumberOrTag::Pending.into()).await { error!( code = e.code, diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index 4a83577..abc9430 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -1,39 +1,94 @@ +//! Transaction service responsible for fetching and sending trasnsactions to the simulator. +use crate::config::BuilderConfig; use alloy::consensus::TxEnvelope; use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::from_slice; +use tokio::{sync::mpsc, task::JoinHandle, time}; +use tracing::{Instrument, debug, trace}; -pub use crate::config::BuilderConfig; - -/// Response from the tx-pool endpoint. +/// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TxPoolResponse { + /// Holds the transactions property as a list on the response. transactions: Vec, } /// Implements a poller for the block builder to pull transactions from the transaction pool. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TxPoller { - /// config for the builder + /// Config values from the Builder. pub config: BuilderConfig, - /// Reqwest client for fetching transactions from the tx-pool + /// Reqwest Client for fetching transactions from the cache. pub client: Client, + /// Defines the interval at which the service should poll the cache. + pub poll_interval_ms: u64, } -/// TxPoller implements a poller that fetches unique transactions from the transaction pool. +/// [`TxPoller`] implements a poller task that fetches unique transactions from the transaction pool. impl TxPoller { - /// returns a new TxPoller with the given config. + /// Returns a new [`TxPoller`] with the given config. + /// * Defaults to 1000ms poll interval (1s). pub fn new(config: &BuilderConfig) -> Self { - Self { config: config.clone(), client: Client::new() } + Self { config: config.clone(), client: Client::new(), poll_interval_ms: 1000 } + } + + /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. + pub fn new_with_poll_interval_ms(config: &BuilderConfig, poll_interval_ms: u64) -> Self { + Self { config: config.clone(), client: Client::new(), poll_interval_ms } } - /// polls the tx-pool for unique transactions and evicts expired transactions. - /// unique transactions that haven't been seen before are sent into the builder pipeline. + /// Polls the transaction cache for transactions. pub async fn check_tx_cache(&mut self) -> Result, Error> { let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?; let result = self.client.get(url).send().await?; let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?; Ok(response.transactions) } + + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + loop { + let span = tracing::debug_span!("TxPoller::loop", url = %self.config.tx_pool_url); + + // Enter the span for the next check. + let _guard = span.enter(); + + // Check this here to avoid making the web request if we know + // we don't need the results. + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + // exit the span after the check. + drop(_guard); + + match self.check_tx_cache().instrument(span.clone()).await { + Ok(transactions) => { + let _guard = span.entered(); + debug!(count = ?transactions.len(), "found transactions"); + for tx in transactions.into_iter() { + if outbound.send(tx).is_err() { + // If there are no receivers, we can shut down + trace!("No receivers left, shutting down"); + break; + } + } + } + // If fetching was an error, we log and continue. We expect + // these to be transient network issues. + Err(e) => { + debug!(error = %e, "Error fetching transactions"); + } + } + time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + } + } + + /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); + let jh = tokio::spawn(self.task_future(outbound)); + (inbound, jh) + } }