From 680ff9068a8ad3aef57c9a9dfa13f1f02eebc40b Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 3 Jun 2025 19:10:11 -0600 Subject: [PATCH 01/10] passes sim result to the submit tasks - adds a SimResult type that binds a BlockEnv to a BuiltBlock - passess that SimResult to the SubmitTask for gas calculations --- bin/submit_transaction.rs | 20 +++++- src/tasks/block/sim.rs | 83 +++++++++++++++++++------ src/tasks/env.rs | 5 +- src/tasks/submit.rs | 119 ++++++++++++++++++++++-------------- tests/block_builder_test.rs | 60 +----------------- 5 files changed, 160 insertions(+), 127 deletions(-) diff --git a/bin/submit_transaction.rs b/bin/submit_transaction.rs index 71b10f4..071f74b 100644 --- a/bin/submit_transaction.rs +++ b/bin/submit_transaction.rs @@ -1,3 +1,5 @@ +//! A simple transaction submitter that sends a transaction to a recipient address +//! on a regular interval for the purposes of roughly testing rollup mining. use alloy::{ network::{EthereumWallet, TransactionBuilder}, primitives::{Address, U256}, @@ -67,18 +69,29 @@ async fn main() { } } +/// Sends a transaction to the specified recipient address async fn send_transaction(provider: &HostProvider, recipient_address: Address) { // construct simple transaction to send ETH to a recipient + let nonce = match provider.get_transaction_count(provider.default_signer_address()).await { + Ok(count) => count, + Err(e) => { + error!(error = ?e, "failed to get transaction count"); + return; + } + }; + let tx = TransactionRequest::default() .with_from(provider.default_signer_address()) .with_to(recipient_address) .with_value(U256::from(1)) + .with_nonce(nonce) .with_gas_limit(30_000); // start timer to measure how long it takes to mine the transaction let dispatch_start_time: Instant = Instant::now(); // dispatch the transaction + debug!(?tx.nonce, "sending transaction with nonce"); let result = provider.send_transaction(tx).await.unwrap(); // wait for the transaction to mine @@ -95,10 +108,13 @@ async fn send_transaction(provider: &HostProvider, recipient_address: Address) { } }; - let hash = receipt.transaction_hash.to_string(); + record_metrics(dispatch_start_time, receipt); +} - // record metrics for how long it took to mine the transaction +/// Record metrics for how long it took to mine the transaction +fn record_metrics(dispatch_start_time: Instant, receipt: alloy::rpc::types::TransactionReceipt) { let mine_time = dispatch_start_time.elapsed().as_secs(); + let hash = receipt.transaction_hash.to_string(); debug!(success = receipt.status(), mine_time, hash, "transaction mined"); histogram!("txn_submitter.tx_mine_time").record(mine_time as f64); } diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index ce7ffb3..e59e799 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -9,7 +9,7 @@ use init4_bin_base::{ }; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; use signet_types::constants::SignetSystemConstants; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::{ sync::{ mpsc::{self}, @@ -17,6 +17,7 @@ use tokio::{ }, task::JoinHandle, }; +use tracing::trace; use trevm::revm::{ context::BlockEnv, database::{AlloyDB, WrapDatabaseAsync}, @@ -34,11 +35,19 @@ pub struct Simulator { pub config: BuilderConfig, /// A provider that cannot sign transactions, used for interacting with the rollup. pub ru_provider: RuProvider, - /// The block configuration environment on which to simulate pub block_env: watch::Receiver>, } +/// SimResult bundles a BuiltBlock to the BlockEnv it was simulated against. +#[derive(Debug, Clone)] +pub struct SimResult { + /// The block built with the successfully simulated transactions + pub block: BuiltBlock, + /// The block environment the transactions were simulated against. + pub env: BlockEnv, +} + impl Simulator { /// Creates a new `Simulator` instance. /// @@ -46,6 +55,7 @@ impl Simulator { /// /// - `config`: The configuration for the builder. /// - `ru_provider`: A provider for interacting with the rollup. + /// - `block_env`: A receiver for the block environment to simulate against. /// /// # Returns /// @@ -70,6 +80,7 @@ impl Simulator { /// - `constants`: The system constants for the rollup. /// - `sim_items`: The simulation cache containing transactions and bundles. /// - `finish_by`: The deadline by which the block must be built. + /// - `block_env`: The block environment to simulate against. /// /// # Returns /// @@ -79,14 +90,22 @@ impl Simulator { constants: SignetSystemConstants, sim_items: SimCache, finish_by: Instant, - block: BlockEnv, + block_env: BlockEnv, ) -> eyre::Result { + debug!( + block_number = block_env.number, + deadline = ?self.instant_to_timestamp(finish_by), + tx_count= sim_items.len(), + "starting block build", + ); + let db = self.create_db().await.unwrap(); + let block_build: BlockBuild<_, NoOpInspector> = BlockBuild::new( db, constants, self.config.cfg_env(), - block, + block_env, finish_by, self.config.concurrency_limit, sim_items, @@ -94,7 +113,11 @@ impl Simulator { ); let built_block = block_build.build().await; - debug!(block_number = ?built_block.block_number(), "finished building block"); + debug!( + tx_count = built_block.tx_count(), + block_number = ?built_block.block_number(), + "block simulation completed", + ); Ok(built_block) } @@ -115,7 +138,7 @@ impl Simulator { self, constants: SignetSystemConstants, cache: SimCache, - submit_sender: mpsc::UnboundedSender, + submit_sender: mpsc::UnboundedSender, ) -> JoinHandle<()> { debug!("starting simulator task"); @@ -140,26 +163,23 @@ impl Simulator { mut self, constants: SignetSystemConstants, cache: SimCache, - submit_sender: mpsc::UnboundedSender, + submit_sender: mpsc::UnboundedSender, ) { loop { - let sim_cache = cache.clone(); - let finish_by = self.calculate_deadline(); - // Wait for the block environment to be set if self.block_env.changed().await.is_err() { error!("block_env channel closed"); return; } - // If no env, skip this run let Some(block_env) = self.block_env.borrow_and_update().clone() else { return }; - debug!(block_env = ?block_env, "building on block env"); - match self.handle_build(constants, sim_cache, finish_by, block_env).await { + let finish_by = self.calculate_deadline(); + let sim_cache = cache.clone(); + match self.handle_build(constants, sim_cache, finish_by, block_env.clone()).await { Ok(block) => { - debug!(block = ?block, "built block"); - let _ = submit_sender.send(block); + debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built block"); + let _ = submit_sender.send(SimResult { block, env: block_env }); } Err(e) => { error!(err = %e, "failed to build block"); @@ -178,17 +198,25 @@ impl Simulator { pub fn calculate_deadline(&self) -> Instant { // Get the current timepoint within the slot. let timepoint = self.slot_calculator().current_timepoint_within_slot(); + trace!(timepoint, "current timepoint within slot"); // We have the timepoint in seconds into the slot. To find out what's // remaining, we need to subtract it from the slot duration let remaining = self.slot_calculator().slot_duration() - timepoint; + trace!(remaining, "time remaining in slot"); // We add a 1500 ms buffer to account for sequencer stopping signing. - - let candidate = + let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); + trace!(deadline = ?self.instant_to_timestamp(deadline), "calculated deadline for block simulation"); + + let buffered_deadline = deadline.max(Instant::now()); + trace!(?buffered_deadline, "final deadline for block simulation"); - candidate.max(Instant::now()) + let timestamp = self.instant_to_timestamp(buffered_deadline); + trace!(?timestamp, "deadline converted to timestamp"); + + buffered_deadline } /// Creates an `AlloyDB` instance from the rollup provider. @@ -217,4 +245,23 @@ impl Simulator { let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap(); Some(wrapped_db) } + + /// Converts an `Instant` to a UNIX timestamp in seconds and milliseconds. + pub fn instant_to_timestamp(&self, instant: Instant) -> (u64, u128) { + let now_instant = Instant::now(); + let now_system = SystemTime::now(); + + let duration_from_now = now_instant.duration_since(instant); + + // Subtract that duration from the system time + let target_system_time = now_system - duration_from_now; + + let duration_since_epoch = + target_system_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); + + let seconds = duration_since_epoch.as_secs(); + let milliseconds = duration_since_epoch.as_millis(); + + (seconds, milliseconds) + } } diff --git a/src/tasks/env.rs b/src/tasks/env.rs index 448d2c8..a2259ca 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -66,7 +66,6 @@ impl EnvTask { let Some(block) = blocks.last() else { // This case occurs when there are no changes to the block, // so we do nothing. - debug!("empty filter changes"); continue; }; let span = info_span!("EnvTask::task_fut::loop", hash = %block, number = tracing::field::Empty); @@ -96,10 +95,10 @@ impl EnvTask { } }; span.record("number", previous.number); - debug!("retrieved latest block"); let env = self.construct_block_env(&previous); - debug!(?env, "constructed block env"); + debug!(block_number = ?env.number, env.basefee, "constructed latest block env"); + if sender.send(Some(env)).is_err() { // The receiver has been dropped, so we can stop the task. debug!("receiver dropped, stopping task"); diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index cab53cb..a081239 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -25,14 +25,13 @@ use signet_zenith::{ Zenith::{self, IncorrectHostBlock}, }; use std::time::{Instant, UNIX_EPOCH}; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::mpsc::{self}, + task::JoinHandle, +}; +use trevm::revm::context::BlockEnv; -/// Base maximum fee per gas to use as a starting point for retry bumps -pub const BASE_FEE_PER_GAS: u128 = 10 * GWEI_TO_WEI as u128; -/// Base max priority fee per gas to use as a starting point for retry bumps -pub const BASE_MAX_PRIORITY_FEE_PER_GAS: u128 = 2 * GWEI_TO_WEI as u128; -/// Base maximum fee per blob gas to use as a starting point for retry bumps -pub const BASE_MAX_FEE_PER_BLOB_GAS: u128 = GWEI_TO_WEI as u128; +use super::block::sim::SimResult; macro_rules! spawn_provider_send { ($provider:expr, $tx:expr) => { @@ -222,7 +221,7 @@ impl SubmitTask { }) } - /// Builds blob transaction and encodes the sidecar for it from the provided header and signature values + /// Encodes the sidecar and then builds the 4844 blob transaction from the provided header and signature values. fn build_blob_tx( &self, fills: Vec, @@ -245,8 +244,9 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, + block_env: &BlockEnv, ) -> eyre::Result { - let tx = self.prepare_tx(retry_count, resp, block).await?; + let tx = self.prepare_tx(retry_count, resp, block, block_env).await?; self.send_transaction(resp, tx).await } @@ -258,9 +258,11 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, + block_env: &BlockEnv, ) -> Result { // Create the transaction request with the signature values - let tx: TransactionRequest = self.new_tx_request(retry_count, resp, block).await?; + let tx: TransactionRequest = + self.new_tx_request(retry_count, resp, block, block_env).await?; // Simulate the transaction with a call to the host provider and report any errors if let Err(err) = self.sim_with_call(&tx).await { @@ -288,6 +290,7 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, + block_env: &BlockEnv, ) -> Result { // manually retrieve nonce let nonce = @@ -297,14 +300,8 @@ impl SubmitTask { // Extract the signature components from the response let (v, r, s) = extract_signature_components(&resp.sig); - // Calculate gas limits based on retry attempts let (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) = - calculate_gas_limits( - retry_count, - BASE_FEE_PER_GAS, - BASE_MAX_PRIORITY_FEE_PER_GAS, - BASE_MAX_FEE_PER_BLOB_GAS, - ); + calculate_gas(retry_count, block_env); // Build the block header let header: BlockHeader = BlockHeader { @@ -386,6 +383,7 @@ impl SubmitTask { &self, retry_count: usize, block: &BuiltBlock, + block_env: &BlockEnv, ) -> eyre::Result { info!(retry_count, txns = block.tx_count(), "handling inbound block"); let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| { @@ -397,18 +395,20 @@ impl SubmitTask { debug!( host_block_number = %sig_request.host_block_number, ru_chain_id = %sig_request.ru_chain_id, + tx_count = block.tx_count(), "constructed signature request for host block" ); let signed = self.quincey.get_signature(&sig_request).await?; - self.submit_transaction(retry_count, &signed, block).await + self.submit_transaction(retry_count, &signed, block, block_env).await } /// Handles the retry logic for the inbound block. async fn retrying_handle_inbound( &self, block: &BuiltBlock, + block_env: &BlockEnv, retry_limit: usize, ) -> eyre::Result { let mut retries = 0; @@ -418,11 +418,11 @@ impl SubmitTask { // Retry loop let result = loop { - // Log the retry attempt let span = debug_span!("SubmitTask::retrying_handle_inbound", retries); let inbound_result = - match self.handle_inbound(retries, block).instrument(span.clone()).await { + match self.handle_inbound(retries, block, block_env).instrument(span.clone()).await + { Ok(control_flow) => control_flow, Err(err) => { // Delay until next slot if we get a 403 error @@ -431,6 +431,7 @@ impl SubmitTask { debug!(slot_number, "403 detected - skipping slot"); return Ok(ControlFlow::Skip); } else { + // Otherwise, log error and retry error!(error = %err, "error handling inbound block"); } @@ -500,29 +501,33 @@ impl SubmitTask { /// Task future for the submit task /// NB: This task assumes that the simulator will only send it blocks for /// slots that it's assigned. - async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { + async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { // Holds a reference to the last block we attempted to submit let mut last_block_attempted: u64 = 0; loop { // Wait to receive a new block - let Some(block) = inbound.recv().await else { + let Some(result) = inbound.recv().await else { debug!("upstream task gone"); break; }; - debug!(block_number = block.block_number(), ?block, "submit channel received block"); + + debug!(block_number = result.block.block_number(), "submit channel received block"); // Only attempt each block number once - if block.block_number() == last_block_attempted { - debug!("block number is unchanged from last attempt - skipping"); + if result.block.block_number() == last_block_attempted { + debug!( + block_number = result.block.block_number(), + "block number is unchanged from last attempt - skipping" + ); continue; } // This means we have encountered a new block, so reset the last block attempted - last_block_attempted = block.block_number(); + last_block_attempted = result.block.block_number(); debug!(last_block_attempted, "resetting last block attempted"); - if self.retrying_handle_inbound(&block, 3).await.is_err() { + if self.retrying_handle_inbound(&result.block, &result.env, 3).await.is_err() { debug!("error handling inbound block"); continue; }; @@ -530,36 +535,60 @@ impl SubmitTask { } /// Spawns the in progress block building task - pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { - let (sender, inbound) = mpsc::unbounded_channel(); + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let (sender, inbound) = mpsc::unbounded_channel::(); let handle = tokio::spawn(self.task_future(inbound)); (sender, handle) } } -// Returns gas parameters based on retry counts. -fn calculate_gas_limits( +/// Calculates gas parameters based on the block environment and retry count. +fn calculate_gas(retry_count: usize, block_env: &BlockEnv) -> (u128, u128, u128) { + let fallback_blob_basefee = 500; + + match block_env.blob_excess_gas_and_price { + Some(excess) => { + if excess.blob_gasprice == 0 { + warn!("blob excess gas price is zero, using default blob base fee"); + return bump_gas_from_retries( + retry_count, + block_env.basefee, + fallback_blob_basefee, + ); + } + + bump_gas_from_retries(retry_count, block_env.basefee, excess.blob_gasprice) + } + None => { + warn!("no blob excess gas and price in block env, using defaults"); + bump_gas_from_retries(retry_count, block_env.basefee, fallback_blob_basefee) + } + } +} + +/// Bumps the gas parameters based on the retry count, base fee, and blob base fee. +pub fn bump_gas_from_retries( retry_count: usize, - base_max_fee_per_gas: u128, - base_max_priority_fee_per_gas: u128, - base_max_fee_per_blob_gas: u128, + basefee: u64, + blob_basefee: u128, ) -> (u128, u128, u128) { - let bump_multiplier = 1150u128.pow(retry_count as u32); // 15% bump - let blob_bump_multiplier = 2000u128.pow(retry_count as u32); // 100% bump (double each time) for blob gas - let bump_divisor = 1000u128.pow(retry_count as u32); + const PRIORITY_FEE_BASE: u64 = 2 * GWEI_TO_WEI; + const BASE_MULTIPLIER: u128 = 2; + const BLOB_MULTIPLIER: u128 = 2; + + // Increase priority fee by 20% per retry + let priority_fee = + PRIORITY_FEE_BASE * (12u64.pow(retry_count as u32) / 10u64.pow(retry_count as u32)); - let max_fee_per_gas = base_max_fee_per_gas * bump_multiplier / bump_divisor; - let max_priority_fee_per_gas = base_max_priority_fee_per_gas * bump_multiplier / bump_divisor; - let max_fee_per_blob_gas = base_max_fee_per_blob_gas * blob_bump_multiplier / bump_divisor; + // Max fee includes basefee + priority + headroom (double basefee, etc.) + let max_fee_per_gas = (basefee as u128) * BASE_MULTIPLIER + (priority_fee as u128); + let max_fee_per_blob_gas = blob_basefee * BLOB_MULTIPLIER * (retry_count as u128 + 1); debug!( retry_count, - max_fee_per_gas, - max_priority_fee_per_gas, - max_fee_per_blob_gas, - "calculated bumped gas parameters" + max_fee_per_gas, priority_fee, max_fee_per_blob_gas, "calculated bumped gas parameters" ); - (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) + (max_fee_per_gas, priority_fee as u128, max_fee_per_blob_gas) } diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs index a88869b..1317a26 100644 --- a/tests/block_builder_test.rs +++ b/tests/block_builder_test.rs @@ -8,13 +8,12 @@ use alloy::{ signers::local::PrivateKeySigner, }; use builder::{ - tasks::{block::sim::Simulator, cache::CacheTask}, + tasks::block::sim::Simulator, test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env}, }; use signet_sim::{SimCache, SimItem}; use signet_types::constants::SignetSystemConstants; use std::time::{Duration, Instant}; -use tokio::{sync::mpsc::unbounded_channel, time::timeout}; /// Tests the `handle_build` method of the `Simulator`. /// @@ -71,60 +70,3 @@ async fn test_handle_build() { assert!(got.is_ok()); assert!(got.unwrap().tx_count() == 2); } - -/// Tests the full block builder loop, including transaction ingestion and block simulation. -/// -/// This test sets up a simulated environment using Anvil, creates a block builder, -/// and verifies that the builder can process incoming transactions and produce a block -/// within a specified timeout. -#[ignore = "integration test"] -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_spawn() { - setup_logging(); - - // Make a test config - let config = setup_test_config().unwrap(); - let constants = SignetSystemConstants::pecorino(); - - // Create an anvil instance for testing - let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn(); - - // Create a wallet - let keys = anvil_instance.keys(); - let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); - let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); - - // Plumb inputs for the test setup - let (tx_sender, tx_receiver) = unbounded_channel(); - let (_, bundle_receiver) = unbounded_channel(); - let (block_sender, mut block_receiver) = unbounded_channel(); - - let env_task = config.env_task(); - let (block_env, _env_jh) = env_task.spawn(); - - let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver); - let (sim_cache, _cache_jh) = cache_task.spawn(); - - // Create a rollup provider - let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); - - let sim = Simulator::new(&config, ru_provider.clone(), block_env); - - // Finally, Kick off the block builder task. - sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender); - - // Feed in transactions to the tx_sender and wait for the block to be simulated - let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); - let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); - tx_sender.send(tx_1).unwrap(); - tx_sender.send(tx_2).unwrap(); - - // Wait for a block with timeout - let result = timeout(Duration::from_secs(5), block_receiver.recv()).await; - assert!(result.is_ok(), "Did not receive block within 5 seconds"); - - // Assert on the block - let block = result.unwrap(); - assert!(block.is_some(), "Block channel closed without receiving a block"); - assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet. -} From 6f31d1e0964131ea200c3f4007ae5ac50cf9407c Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:00:52 -0600 Subject: [PATCH 02/10] better logging --- src/tasks/mod.rs | 2 +- src/tasks/submit.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index eee57d5..4715d74 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -10,5 +10,5 @@ pub mod metrics; /// Tx submission task pub mod submit; -/// Constructs the simualtion environment. +/// Constructs the simulation environment. pub mod env; diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index a081239..ca6bc9c 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -413,6 +413,7 @@ impl SubmitTask { ) -> eyre::Result { let mut retries = 0; let building_start_time = Instant::now(); + let (current_slot, start, end) = self.calculate_slot_window(); debug!(current_slot, start, end, "calculating target slot window"); @@ -467,9 +468,15 @@ impl SubmitTask { }; // This is reached when `Done` or `Skip` is returned - histogram!("builder.block_build_time") - .record(building_start_time.elapsed().as_millis() as f64); - info!(?result, "finished block building"); + let elapsed = building_start_time.elapsed().as_millis() as f64; + histogram!("builder.block_build_time").record(elapsed); + info!( + ?result, + tx_count = block.tx_count(), + block_number = block.block_number(), + build_time = ?elapsed, + "finished block building" + ); Ok(result) } @@ -508,10 +515,9 @@ impl SubmitTask { loop { // Wait to receive a new block let Some(result) = inbound.recv().await else { - debug!("upstream task gone"); + debug!("upstream task gone - exiting submit task"); break; }; - debug!(block_number = result.block.block_number(), "submit channel received block"); // Only attempt each block number once From cffb527db3b1d0e6e894dcf3f0e646e0563ee57f Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:38:46 -0600 Subject: [PATCH 03/10] cleanup --- src/tasks/block/sim.rs | 28 +++------------------------- src/tasks/submit.rs | 22 +++------------------- 2 files changed, 6 insertions(+), 44 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index e59e799..22a5e69 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -9,7 +9,7 @@ use init4_bin_base::{ }; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; use signet_types::constants::SignetSystemConstants; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant}; use tokio::{ sync::{ mpsc::{self}, @@ -94,7 +94,7 @@ impl Simulator { ) -> eyre::Result { debug!( block_number = block_env.number, - deadline = ?self.instant_to_timestamp(finish_by), + ?finish_by, tx_count= sim_items.len(), "starting block build", ); @@ -208,14 +208,11 @@ impl Simulator { // We add a 1500 ms buffer to account for sequencer stopping signing. let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); - trace!(deadline = ?self.instant_to_timestamp(deadline), "calculated deadline for block simulation"); + trace!(?deadline, "calculated deadline for block simulation"); let buffered_deadline = deadline.max(Instant::now()); trace!(?buffered_deadline, "final deadline for block simulation"); - let timestamp = self.instant_to_timestamp(buffered_deadline); - trace!(?timestamp, "deadline converted to timestamp"); - buffered_deadline } @@ -245,23 +242,4 @@ impl Simulator { let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap(); Some(wrapped_db) } - - /// Converts an `Instant` to a UNIX timestamp in seconds and milliseconds. - pub fn instant_to_timestamp(&self, instant: Instant) -> (u64, u128) { - let now_instant = Instant::now(); - let now_system = SystemTime::now(); - - let duration_from_now = now_instant.duration_since(instant); - - // Subtract that duration from the system time - let target_system_time = now_system - duration_from_now; - - let duration_since_epoch = - target_system_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); - - let seconds = duration_since_epoch.as_secs(); - let milliseconds = duration_since_epoch.as_millis(); - - (seconds, milliseconds) - } } diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index ca6bc9c..7a42d56 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -509,9 +509,6 @@ impl SubmitTask { /// NB: This task assumes that the simulator will only send it blocks for /// slots that it's assigned. async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { - // Holds a reference to the last block we attempted to submit - let mut last_block_attempted: u64 = 0; - loop { // Wait to receive a new block let Some(result) = inbound.recv().await else { @@ -520,23 +517,10 @@ impl SubmitTask { }; debug!(block_number = result.block.block_number(), "submit channel received block"); - // Only attempt each block number once - if result.block.block_number() == last_block_attempted { - debug!( - block_number = result.block.block_number(), - "block number is unchanged from last attempt - skipping" - ); - continue; + if let Err(e) = self.retrying_handle_inbound(&result.block, &result.env, 3).await { + error!(error = %e, "error handling inbound block"); + continue; } - - // This means we have encountered a new block, so reset the last block attempted - last_block_attempted = result.block.block_number(); - debug!(last_block_attempted, "resetting last block attempted"); - - if self.retrying_handle_inbound(&result.block, &result.env, 3).await.is_err() { - debug!("error handling inbound block"); - continue; - }; } } From 6ee8b7793cd70f4cda4819d12820a848251bd92a Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:42:15 -0600 Subject: [PATCH 04/10] fmt --- src/tasks/block/sim.rs | 2 +- src/tasks/submit.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 22a5e69..9a097cd 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -95,7 +95,7 @@ impl Simulator { debug!( block_number = block_env.number, ?finish_by, - tx_count= sim_items.len(), + tx_count = sim_items.len(), "starting block build", ); diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 7a42d56..cc0421c 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -519,7 +519,7 @@ impl SubmitTask { if let Err(e) = self.retrying_handle_inbound(&result.block, &result.env, 3).await { error!(error = %e, "error handling inbound block"); - continue; + continue; } } } From 031e8689a41429646db501f18e4e98cf7d1d8297 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:47:55 -0600 Subject: [PATCH 05/10] remove verbose logging --- src/tasks/block/sim.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 9a097cd..1bcc2a0 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -198,20 +198,16 @@ impl Simulator { pub fn calculate_deadline(&self) -> Instant { // Get the current timepoint within the slot. let timepoint = self.slot_calculator().current_timepoint_within_slot(); - trace!(timepoint, "current timepoint within slot"); // We have the timepoint in seconds into the slot. To find out what's // remaining, we need to subtract it from the slot duration let remaining = self.slot_calculator().slot_duration() - timepoint; - trace!(remaining, "time remaining in slot"); // We add a 1500 ms buffer to account for sequencer stopping signing. let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); - trace!(?deadline, "calculated deadline for block simulation"); let buffered_deadline = deadline.max(Instant::now()); - trace!(?buffered_deadline, "final deadline for block simulation"); buffered_deadline } From 79ca0dac0b9e610847d3a36a4da279b105216aeb Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:48:37 -0600 Subject: [PATCH 06/10] clippy --- src/tasks/block/sim.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 1bcc2a0..7e274a4 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -17,7 +17,6 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::trace; use trevm::revm::{ context::BlockEnv, database::{AlloyDB, WrapDatabaseAsync}, @@ -207,9 +206,7 @@ impl Simulator { let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); - let buffered_deadline = deadline.max(Instant::now()); - - buffered_deadline + deadline.max(Instant::now()) } /// Creates an `AlloyDB` instance from the rollup provider. From 0eec4400401f8d5b3c45c340d7021d7388d2b67f Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 23:08:41 -0600 Subject: [PATCH 07/10] fix: don't submit empty blocks --- src/tasks/submit.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index cc0421c..bcf00df 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -517,6 +517,12 @@ impl SubmitTask { }; debug!(block_number = result.block.block_number(), "submit channel received block"); + // Don't submit empty blocks + if result.block.is_empty() { + debug!("received empty block - skipping"); + continue; + } + if let Err(e) = self.retrying_handle_inbound(&result.block, &result.env, 3).await { error!(error = %e, "error handling inbound block"); continue; From 3e98b2e19c8dbb83085356eb84a285a0affc2dde Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 11 Jun 2025 14:21:31 -0600 Subject: [PATCH 08/10] bump trevm to include bugfix --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f3b6605..391a71f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", branch = "m signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } -trevm = { version = "0.23.4", features = ["concurrent-db", "test-utils"] } +trevm = { version = "0.23.6", features = ["concurrent-db", "test-utils"] } alloy = { version = "1.0.5", features = [ "full", From f7defa38df83fa749af6b4ad37f7ebdc62b96983 Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 11 Jun 2025 14:23:54 -0600 Subject: [PATCH 09/10] refactor: attach host block header to SimEnv --- bin/builder.rs | 2 +- src/config.rs | 13 ++-- src/tasks/block/sim.rs | 65 ++++++++++------- src/tasks/cache/task.rs | 14 ++-- src/tasks/env.rs | 140 ++++++++++++++++++++++++++---------- src/tasks/submit.rs | 93 ++++++++++++------------ tests/block_builder_test.rs | 2 +- tests/cache.rs | 2 +- tests/env.rs | 2 +- 9 files changed, 204 insertions(+), 129 deletions(-) diff --git a/bin/builder.rs b/bin/builder.rs index 7ee26fa..4d7d949 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -22,7 +22,7 @@ async fn main() -> eyre::Result<()> { let constants = SignetSystemConstants::pecorino(); // Spawn the EnvTask - let env_task = config.env_task(); + let env_task = config.env_task().await; let (block_env, env_jh) = env_task.spawn(); // Spawn the cache system diff --git a/src/config.rs b/src/config.rs index f869fad..0e6778e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ use crate::{ tasks::{ block::cfg::SignetCfgEnv, cache::{BundlePoller, CacheSystem, CacheTask, TxPoller}, - env::EnvTask, + env::{EnvTask, SimEnv}, }, }; use alloy::{ @@ -29,7 +29,6 @@ use init4_bin_base::{ use signet_zenith::Zenith; use std::borrow::Cow; use tokio::sync::watch; -use trevm::revm::context::BlockEnv; /// Type alias for the provider used to simulate against rollup state. pub type RuProvider = RootProvider; @@ -246,9 +245,11 @@ impl BuilderConfig { } /// Create an [`EnvTask`] using this config. - pub fn env_task(&self) -> EnvTask { - let provider = self.connect_ru_provider(); - EnvTask::new(self.clone(), provider) + pub async fn env_task(&self) -> EnvTask { + let ru_provider = self.connect_ru_provider(); + let host_provider = + self.connect_host_provider().await.expect("failed to configure host provider"); + EnvTask::new(self.clone(), ru_provider, host_provider) } /// Spawn a new [`CacheSystem`] using this config. This contains the @@ -256,7 +257,7 @@ impl BuilderConfig { /// well as the [`SimCache`] and the block env watcher. /// /// [`SimCache`]: signet_sim::SimCache - pub fn spawn_cache_system(&self, block_env: watch::Receiver>) -> CacheSystem { + pub fn spawn_cache_system(&self, block_env: watch::Receiver>) -> CacheSystem { // Tx Poller pulls transactions from the cache let tx_poller = TxPoller::new(self); let (tx_receiver, tx_poller) = tx_poller.spawn(); diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 7e274a4..97ad6d8 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -1,14 +1,22 @@ //! `block.rs` contains the Simulator and everything that wires it into an //! actor that handles the simulation of a stream of bundles and transactions //! and turns them into valid Pecorino blocks for network submission. -use crate::config::{BuilderConfig, RuProvider}; -use alloy::{eips::BlockId, network::Ethereum, providers::Provider}; +use crate::{ + config::{BuilderConfig, RuProvider}, + tasks::env::SimEnv, +}; +use alloy::{ + eips::BlockId, + network::Ethereum, + providers::Provider, +}; use init4_bin_base::{ deps::tracing::{debug, error}, utils::calc::SlotCalculator, }; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; use signet_types::constants::SignetSystemConstants; +use tracing::info; use std::time::{Duration, Instant}; use tokio::{ sync::{ @@ -35,7 +43,7 @@ pub struct Simulator { /// A provider that cannot sign transactions, used for interacting with the rollup. pub ru_provider: RuProvider, /// The block configuration environment on which to simulate - pub block_env: watch::Receiver>, + pub sim_env: watch::Receiver>, } /// SimResult bundles a BuiltBlock to the BlockEnv it was simulated against. @@ -44,7 +52,7 @@ pub struct SimResult { /// The block built with the successfully simulated transactions pub block: BuiltBlock, /// The block environment the transactions were simulated against. - pub env: BlockEnv, + pub env: SimEnv, } impl Simulator { @@ -62,9 +70,9 @@ impl Simulator { pub fn new( config: &BuilderConfig, ru_provider: RuProvider, - block_env: watch::Receiver>, + sim_env: watch::Receiver>, ) -> Self { - Self { config: config.clone(), ru_provider, block_env } + Self { config: config.clone(), ru_provider, sim_env } } /// Get the slot calculator. @@ -74,6 +82,10 @@ impl Simulator { /// Handles building a single block. /// + /// Builds a block in the block environment with items from the simulation cache + /// against the database state. When the `finish_by` deadline is reached, it + /// stops simulating and returns the block. + /// /// # Arguments /// /// - `constants`: The system constants for the rollup. @@ -93,7 +105,6 @@ impl Simulator { ) -> eyre::Result { debug!( block_number = block_env.number, - ?finish_by, tx_count = sim_items.len(), "starting block build", ); @@ -114,15 +125,15 @@ impl Simulator { let built_block = block_build.build().await; debug!( tx_count = built_block.tx_count(), - block_number = ?built_block.block_number(), + block_number = built_block.block_number(), "block simulation completed", ); Ok(built_block) } - /// Spawns the simulator task, which handles the setup and sets the deadline - /// for the each round of simulation. + /// Spawns the simulator task, which ticks along the simulation loop + /// as it receives block environments. /// /// # Arguments /// @@ -144,14 +155,16 @@ impl Simulator { tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await }) } - /// Continuously runs the block simulation and submission loop. + /// This function runs indefinitely, waiting for the block environment to be set and checking + /// if the current slot is valid before building a block and sending it along for to the submit channel. /// - /// This function clones the simulation cache, calculates a deadline for block building, - /// attempts to build a block using the latest cache and constants, and submits the built - /// block through the provided channel. If an error occurs during block building or submission, - /// it logs the error and continues the loop. - /// - /// This function runs indefinitely and never returns. + /// If it is authorized for the current slot, then the simulator task + /// - clones the simulation cache, + /// - calculates a deadline for block building, + /// - attempts to build a block using the latest cache and constants, + /// - then submits the built block through the provided channel. + /// + /// If an error occurs during block building or submission, it logs the error and continues the loop. /// /// # Arguments /// @@ -166,19 +179,23 @@ impl Simulator { ) { loop { // Wait for the block environment to be set - if self.block_env.changed().await.is_err() { - error!("block_env channel closed"); + if self.sim_env.changed().await.is_err() { + error!("block_env channel closed - shutting down simulator task"); return; } + let Some(sim_env) = self.sim_env.borrow_and_update().clone() else { return }; + info!(block_number = sim_env.signet.number, "new block environment received"); - let Some(block_env) = self.block_env.borrow_and_update().clone() else { return }; - + // Calculate the deadline for this block simulation. + // NB: This must happen _after_ taking a reference to the sim cache, + // waiting for a new block, and checking current slot authorization. let finish_by = self.calculate_deadline(); let sim_cache = cache.clone(); - match self.handle_build(constants, sim_cache, finish_by, block_env.clone()).await { + match self.handle_build(constants, sim_cache, finish_by, sim_env.signet.clone()).await + { Ok(block) => { - debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built block"); - let _ = submit_sender.send(SimResult { block, env: block_env }); + debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built simulated block"); + let _ = submit_sender.send(SimResult { block, env: sim_env }); } Err(e) => { error!(err = %e, "failed to build block"); diff --git a/src/tasks/cache/task.rs b/src/tasks/cache/task.rs index 5c4fea5..eda2311 100644 --- a/src/tasks/cache/task.rs +++ b/src/tasks/cache/task.rs @@ -6,7 +6,8 @@ use tokio::{ sync::{mpsc, watch}, task::JoinHandle, }; -use trevm::revm::context::BlockEnv; + +use crate::tasks::env::SimEnv; /// Cache task for the block builder. /// @@ -16,8 +17,7 @@ use trevm::revm::context::BlockEnv; #[derive(Debug)] pub struct CacheTask { /// The channel to receive the block environment. - env: watch::Receiver>, - + env: watch::Receiver>, /// The channel to receive the transaction bundles. bundles: mpsc::UnboundedReceiver, /// The channel to receive the transactions. @@ -27,7 +27,7 @@ pub struct CacheTask { impl CacheTask { /// Create a new cache task with the given cache and channels. pub const fn new( - env: watch::Receiver>, + env: watch::Receiver>, bundles: mpsc::UnboundedReceiver, txns: mpsc::UnboundedReceiver, ) -> Self { @@ -45,10 +45,10 @@ impl CacheTask { break; } if let Some(env) = self.env.borrow_and_update().as_ref() { - basefee = env.basefee; - info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache"); + basefee = env.signet.basefee; + info!(basefee, env.signet.number, env.signet.timestamp, "rollup block env changed, clearing cache"); cache.clean( - env.number, env.timestamp + env.signet.number, env.signet.timestamp ); } } diff --git a/src/tasks/env.rs b/src/tasks/env.rs index a2259ca..a568823 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -1,7 +1,7 @@ -use crate::config::{BuilderConfig, RuProvider}; +use crate::config::{BuilderConfig, HostProvider, RuProvider}; use alloy::{ consensus::Header, - eips::eip1559::BaseFeeParams, + eips::{BlockId, BlockNumberOrTag, eip1559::BaseFeeParams}, primitives::{B256, U256}, providers::Provider, }; @@ -14,17 +14,34 @@ use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndP /// A task that constructs a BlockEnv for the next block in the rollup chain. #[derive(Debug, Clone)] pub struct EnvTask { + /// Builder configuration values. config: BuilderConfig, - provider: RuProvider, + /// Rollup provider is used to get the latest rollup block header for simulation. + ru_provider: RuProvider, + /// Host provider is used to get the previous block header for gas estimation. + host_provider: HostProvider, +} + +/// Contains a signet BlockEnv and its corresponding host Header. +#[derive(Debug, Clone)] +pub struct SimEnv { + /// The signet block environment, for rollup block simulation. + pub signet: BlockEnv, + /// The host environment header, for host transaction submission pricing. + pub host: Header, } impl EnvTask { - /// Create a new EnvTask with the given config and provider. - pub const fn new(config: BuilderConfig, provider: RuProvider) -> Self { - Self { config, provider } + /// Create a new [`EnvTask`] with the given config and providers. + pub const fn new( + config: BuilderConfig, + ru_provider: RuProvider, + host_provider: HostProvider, + ) -> Self { + Self { config, ru_provider, host_provider } } - /// Construct a BlockEnv by making calls to the provider. + /// Construct a [`BlockEnv`] by from the previous block header. fn construct_block_env(&self, previous: &Header) -> BlockEnv { BlockEnv { number: previous.number + 1, @@ -44,10 +61,10 @@ impl EnvTask { } } - /// Construct the BlockEnv and send it to the sender. - async fn task_fut(self, sender: watch::Sender>) { + /// Returns a sender that sends [`SimEnv`] for communicating the next block environment. + async fn task_fut(self, sender: watch::Sender>) { let span = info_span!("EnvTask::task_fut::init"); - let mut poller = match self.provider.watch_blocks().instrument(span.clone()).await { + let mut poller = match self.ru_provider.watch_blocks().instrument(span.clone()).await { Ok(poller) => poller, Err(err) => { let _span = span.enter(); @@ -63,43 +80,37 @@ impl EnvTask { while let Some(blocks) = blocks.next().instrument(info_span!("EnvTask::task_fut::stream")).await { - let Some(block) = blocks.last() else { + let Some(block_hash) = blocks.last() else { // This case occurs when there are no changes to the block, // so we do nothing. continue; }; - let span = info_span!("EnvTask::task_fut::loop", hash = %block, number = tracing::field::Empty); - - let previous = match self - .provider - .get_block((*block).into()) - .into_future() - .instrument(span.clone()) - .await - { - Ok(Some(block)) => block.header.inner, - Ok(None) => { - let _span = span.enter(); - let _ = sender.send(None); - debug!("block not found"); - // This may mean the chain had a rollback, so the next poll - // should find something. - continue; - } - Err(err) => { - let _span = span.enter(); - let _ = sender.send(None); - error!(%err, "Failed to get latest block"); - // Error may be transient, so we should not break the loop. + let span = + info_span!("EnvTask::task_fut::loop", %block_hash, number = tracing::field::Empty); + + // Get the rollup header for rollup block simulation environment configuration + let rollup_header = match self.get_latest_rollup_header(&sender, block_hash, &span).await { + Some(value) => value, + None => continue, + }; + debug!(?rollup_header.number, "pulled rollup block for simulation"); + + // Get the host header for blob transaction submission gas pricing + let host_header = match self.get_host_header().await { + Ok(header) => header, + Err(_) => { + error!("failed to get host header - skipping block"); continue; } }; - span.record("number", previous.number); + debug!(?host_header.base_fee_per_gas, "pulled previous host header for gas calculation"); + span.record("rollup_block_number", rollup_header.number); - let env = self.construct_block_env(&previous); - debug!(block_number = ?env.number, env.basefee, "constructed latest block env"); + // Construct the block env using the previous block header + let signet_env = self.construct_block_env(&host_header); + debug!(block_number = signet_env.number, signet_env.basefee, "constructed signet block env"); - if sender.send(Some(env)).is_err() { + if sender.send(Some(SimEnv { signet: signet_env, host: host_header })).is_err() { // The receiver has been dropped, so we can stop the task. debug!("receiver dropped, stopping task"); break; @@ -107,8 +118,59 @@ impl EnvTask { } } + /// Gets the latest host [`Header`]. + /// NB: This doesn't need to correlate perfectly with the rollup blocks, + /// since we only use the previous host block [`Header`] for gas estimation. + async fn get_host_header(&self) -> eyre::Result
{ + let previous = self + .host_provider + .get_block(BlockId::Number(BlockNumberOrTag::Latest)) + .into_future() + .await?; + debug!(?previous, "got host block for hash"); + + match previous { + Some(block) => Ok(block.header.inner), + None => Err(eyre::eyre!("host block not found")), + } + } + + /// Get latest rollup [`Header`] for the given block hash. + async fn get_latest_rollup_header( + &self, + sender: &watch::Sender>, + block: &alloy::primitives::FixedBytes<32>, + span: &tracing::Span, + ) -> Option
{ + let previous = match self + .ru_provider + .get_block((*block).into()) + .into_future() + .instrument(span.clone()) + .await + { + Ok(Some(block)) => block.header.inner, + Ok(None) => { + let _span = span.enter(); + let _ = sender.send(None); + debug!("rollup block not found"); + // This may mean the chain had a rollback, so the next poll + // should find something. + return None; + } + Err(err) => { + let _span = span.enter(); + let _ = sender.send(None); + error!(%err, "Failed to get latest block"); + // Error may be transient, so we should not break the loop. + return None; + } + }; + Some(previous) + } + /// Spawn the task and return a watch::Receiver for the BlockEnv. - pub fn spawn(self) -> (watch::Receiver>, JoinHandle<()>) { + pub fn spawn(self) -> (watch::Receiver>, JoinHandle<()>) { let (sender, receiver) = watch::channel(None); let fut = self.task_fut(sender); let jh = tokio::spawn(fut); diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index bcf00df..77d51e7 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -1,10 +1,11 @@ use crate::{ config::{HostProvider, ZenithInstance}, quincey::Quincey, + tasks::env::SimEnv, utils::extract_signature_components, }; use alloy::{ - consensus::{SimpleCoder, constants::GWEI_TO_WEI}, + consensus::{Header, SimpleCoder, constants::GWEI_TO_WEI}, eips::BlockNumberOrTag, network::{TransactionBuilder, TransactionBuilder4844}, primitives::{Bytes, FixedBytes, TxHash, U256}, @@ -29,9 +30,8 @@ use tokio::{ sync::mpsc::{self}, task::JoinHandle, }; -use trevm::revm::context::BlockEnv; -use super::block::sim::SimResult; +use crate::tasks::block::sim::SimResult; macro_rules! spawn_provider_send { ($provider:expr, $tx:expr) => { @@ -208,13 +208,10 @@ impl SubmitTask { /// correct height, chain ID, gas limit, and rollup reward address. #[instrument(skip_all)] async fn construct_sig_request(&self, contents: &BuiltBlock) -> eyre::Result { - let ru_chain_id = U256::from(self.config.ru_chain_id); - let next_block_height = self.next_host_block_height().await?; - Ok(SignRequest { - host_block_number: U256::from(next_block_height), + host_block_number: U256::from(self.next_host_block_height().await?), host_chain_id: U256::from(self.config.host_chain_id), - ru_chain_id, + ru_chain_id: U256::from(self.config.ru_chain_id), gas_limit: U256::from(self.config.rollup_block_gas_limit), ru_reward_address: self.config.builder_rewards_address, contents: *contents.contents_hash(), @@ -244,9 +241,9 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, - block_env: &BlockEnv, + sim_env: &SimEnv, ) -> eyre::Result { - let tx = self.prepare_tx(retry_count, resp, block, block_env).await?; + let tx = self.prepare_tx(retry_count, resp, block, sim_env).await?; self.send_transaction(resp, tx).await } @@ -258,11 +255,10 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, - block_env: &BlockEnv, + sim_env: &SimEnv, ) -> Result { // Create the transaction request with the signature values - let tx: TransactionRequest = - self.new_tx_request(retry_count, resp, block, block_env).await?; + let tx: TransactionRequest = self.new_tx_request(retry_count, resp, block, sim_env).await?; // Simulate the transaction with a call to the host provider and report any errors if let Err(err) = self.sim_with_call(&tx).await { @@ -290,7 +286,7 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, - block_env: &BlockEnv, + sim_env: &SimEnv, ) -> Result { // manually retrieve nonce let nonce = @@ -301,7 +297,7 @@ impl SubmitTask { let (v, r, s) = extract_signature_components(&resp.sig); let (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) = - calculate_gas(retry_count, block_env); + calculate_gas(retry_count, sim_env.host.clone()); // Build the block header let header: BlockHeader = BlockHeader { @@ -311,11 +307,11 @@ impl SubmitTask { rewardAddress: resp.req.ru_reward_address, blockDataHash: *block.contents_hash(), }; - debug!(?header, "built block header"); + debug!(?header.hostBlockNumber, "built rollup block header"); // Extract fills from the built block let fills = self.extract_fills(block); - debug!(fill_count = fills.len(), "extracted fills"); + debug!(fill_count = fills.len(), "extracted fills from rollup block"); // Create a blob transaction with the blob header and signature values and return it let tx = self @@ -383,7 +379,7 @@ impl SubmitTask { &self, retry_count: usize, block: &BuiltBlock, - block_env: &BlockEnv, + sim_env: &SimEnv, ) -> eyre::Result { info!(retry_count, txns = block.tx_count(), "handling inbound block"); let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| { @@ -401,14 +397,14 @@ impl SubmitTask { let signed = self.quincey.get_signature(&sig_request).await?; - self.submit_transaction(retry_count, &signed, block, block_env).await + self.submit_transaction(retry_count, &signed, block, sim_env).await } /// Handles the retry logic for the inbound block. async fn retrying_handle_inbound( &self, block: &BuiltBlock, - block_env: &BlockEnv, + sim_env: &SimEnv, retry_limit: usize, ) -> eyre::Result { let mut retries = 0; @@ -422,8 +418,7 @@ impl SubmitTask { let span = debug_span!("SubmitTask::retrying_handle_inbound", retries); let inbound_result = - match self.handle_inbound(retries, block, block_env).instrument(span.clone()).await - { + match self.handle_inbound(retries, block, sim_env).instrument(span.clone()).await { Ok(control_flow) => control_flow, Err(err) => { // Delay until next slot if we get a 403 error @@ -505,25 +500,28 @@ impl SubmitTask { block.host_fills().iter().map(FillPermit2::from).collect() } - /// Task future for the submit task - /// NB: This task assumes that the simulator will only send it blocks for - /// slots that it's assigned. + /// Task future for the submit task. This function runs the main loop of the task. async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { loop { // Wait to receive a new block - let Some(result) = inbound.recv().await else { + let Some(sim_result) = inbound.recv().await else { debug!("upstream task gone - exiting submit task"); break; }; - debug!(block_number = result.block.block_number(), "submit channel received block"); + debug!(block_number = sim_result.block.block_number(), "submit channel received block"); // Don't submit empty blocks - if result.block.is_empty() { - debug!("received empty block - skipping"); + if sim_result.block.is_empty() { + debug!( + block_number = sim_result.block.block_number(), + "received empty block - skipping" + ); continue; } - if let Err(e) = self.retrying_handle_inbound(&result.block, &result.env, 3).await { + if let Err(e) = + self.retrying_handle_inbound(&sim_result.block, &sim_result.env, 3).await + { error!(error = %e, "error handling inbound block"); continue; } @@ -534,33 +532,30 @@ impl SubmitTask { pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { let (sender, inbound) = mpsc::unbounded_channel::(); let handle = tokio::spawn(self.task_future(inbound)); - (sender, handle) } } /// Calculates gas parameters based on the block environment and retry count. -fn calculate_gas(retry_count: usize, block_env: &BlockEnv) -> (u128, u128, u128) { +fn calculate_gas(retry_count: usize, prev_header: Header) -> (u128, u128, u128) { let fallback_blob_basefee = 500; + let fallback_basefee = 7; - match block_env.blob_excess_gas_and_price { - Some(excess) => { - if excess.blob_gasprice == 0 { - warn!("blob excess gas price is zero, using default blob base fee"); - return bump_gas_from_retries( - retry_count, - block_env.basefee, - fallback_blob_basefee, - ); - } + let base_fee_per_gas = match prev_header.base_fee_per_gas { + Some(basefee) => basefee, + None => fallback_basefee, + }; - bump_gas_from_retries(retry_count, block_env.basefee, excess.blob_gasprice) - } - None => { - warn!("no blob excess gas and price in block env, using defaults"); - bump_gas_from_retries(retry_count, block_env.basefee, fallback_blob_basefee) - } - } + let parent_blob_basefee = prev_header.excess_blob_gas.unwrap_or(0) as u128; + let blob_basefee = if parent_blob_basefee > 0 { + // Use the parent blob base fee if available + parent_blob_basefee + } else { + // Fallback to a default value if no blob base fee is set + fallback_blob_basefee + }; + + bump_gas_from_retries(retry_count, base_fee_per_gas, blob_basefee as u128) } /// Bumps the gas parameters based on the retry count, base fee, and blob base fee. diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs index 1317a26..3aa09a6 100644 --- a/tests/block_builder_test.rs +++ b/tests/block_builder_test.rs @@ -42,7 +42,7 @@ async fn test_handle_build() { // Create a rollup provider let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); - let block_env = config.env_task().spawn().0; + let block_env = config.env_task().await.spawn().0; let block_builder = Simulator::new(&config, ru_provider.clone(), block_env); diff --git a/tests/cache.rs b/tests/cache.rs index bd80f19..b2f5f6f 100644 --- a/tests/cache.rs +++ b/tests/cache.rs @@ -9,7 +9,7 @@ async fn test_bundle_poller_roundtrip() -> eyre::Result<()> { let config = setup_test_config().unwrap(); - let (block_env, _jh) = config.env_task().spawn(); + let (block_env, _jh) = config.env_task().await.spawn(); let cache = config.spawn_cache_system(block_env); tokio::time::sleep(Duration::from_secs(12)).await; diff --git a/tests/env.rs b/tests/env.rs index adebe43..c5f6890 100644 --- a/tests/env.rs +++ b/tests/env.rs @@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() { let config = setup_test_config().unwrap(); let env_task = config.env_task(); - let (mut env_watcher, _jh) = env_task.spawn(); + let (mut env_watcher, _jh) = env_task.await.spawn(); env_watcher.changed().await.unwrap(); let env = env_watcher.borrow_and_update(); From 44bc5cd9968ff0abf93b9d3c9f95f73a5f6caf62 Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 11 Jun 2025 14:35:56 -0600 Subject: [PATCH 10/10] clippy + fmt --- src/tasks/block/sim.rs | 25 ++++++++----------------- src/tasks/env.rs | 14 +++++++++----- src/tasks/submit.rs | 4 ++-- 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 97ad6d8..eb424da 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -5,18 +5,13 @@ use crate::{ config::{BuilderConfig, RuProvider}, tasks::env::SimEnv, }; -use alloy::{ - eips::BlockId, - network::Ethereum, - providers::Provider, -}; +use alloy::{eips::BlockId, network::Ethereum, providers::Provider}; use init4_bin_base::{ deps::tracing::{debug, error}, utils::calc::SlotCalculator, }; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; use signet_types::constants::SignetSystemConstants; -use tracing::info; use std::time::{Duration, Instant}; use tokio::{ sync::{ @@ -25,6 +20,7 @@ use tokio::{ }, task::JoinHandle, }; +use tracing::info; use trevm::revm::{ context::BlockEnv, database::{AlloyDB, WrapDatabaseAsync}, @@ -83,8 +79,8 @@ impl Simulator { /// Handles building a single block. /// /// Builds a block in the block environment with items from the simulation cache - /// against the database state. When the `finish_by` deadline is reached, it - /// stops simulating and returns the block. + /// against the database state. When the `finish_by` deadline is reached, it + /// stops simulating and returns the block. /// /// # Arguments /// @@ -103,11 +99,7 @@ impl Simulator { finish_by: Instant, block_env: BlockEnv, ) -> eyre::Result { - debug!( - block_number = block_env.number, - tx_count = sim_items.len(), - "starting block build", - ); + debug!(block_number = block_env.number, tx_count = sim_items.len(), "starting block build",); let db = self.create_db().await.unwrap(); @@ -155,7 +147,7 @@ impl Simulator { tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await }) } - /// This function runs indefinitely, waiting for the block environment to be set and checking + /// This function runs indefinitely, waiting for the block environment to be set and checking /// if the current slot is valid before building a block and sending it along for to the submit channel. /// /// If it is authorized for the current slot, then the simulator task @@ -163,7 +155,7 @@ impl Simulator { /// - calculates a deadline for block building, /// - attempts to build a block using the latest cache and constants, /// - then submits the built block through the provided channel. - /// + /// /// If an error occurs during block building or submission, it logs the error and continues the loop. /// /// # Arguments @@ -191,8 +183,7 @@ impl Simulator { // waiting for a new block, and checking current slot authorization. let finish_by = self.calculate_deadline(); let sim_cache = cache.clone(); - match self.handle_build(constants, sim_cache, finish_by, sim_env.signet.clone()).await - { + match self.handle_build(constants, sim_cache, finish_by, sim_env.signet.clone()).await { Ok(block) => { debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built simulated block"); let _ = submit_sender.send(SimResult { block, env: sim_env }); diff --git a/src/tasks/env.rs b/src/tasks/env.rs index a568823..cc627c3 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -89,10 +89,11 @@ impl EnvTask { info_span!("EnvTask::task_fut::loop", %block_hash, number = tracing::field::Empty); // Get the rollup header for rollup block simulation environment configuration - let rollup_header = match self.get_latest_rollup_header(&sender, block_hash, &span).await { - Some(value) => value, - None => continue, - }; + let rollup_header = + match self.get_latest_rollup_header(&sender, block_hash, &span).await { + Some(value) => value, + None => continue, + }; debug!(?rollup_header.number, "pulled rollup block for simulation"); // Get the host header for blob transaction submission gas pricing @@ -108,7 +109,10 @@ impl EnvTask { // Construct the block env using the previous block header let signet_env = self.construct_block_env(&host_header); - debug!(block_number = signet_env.number, signet_env.basefee, "constructed signet block env"); + debug!( + block_number = signet_env.number, + signet_env.basefee, "constructed signet block env" + ); if sender.send(Some(SimEnv { signet: signet_env, host: host_header })).is_err() { // The receiver has been dropped, so we can stop the task. diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 77d51e7..b7cd17e 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -552,10 +552,10 @@ fn calculate_gas(retry_count: usize, prev_header: Header) -> (u128, u128, u128) parent_blob_basefee } else { // Fallback to a default value if no blob base fee is set - fallback_blob_basefee + fallback_blob_basefee }; - bump_gas_from_retries(retry_count, base_fee_per_gas, blob_basefee as u128) + bump_gas_from_retries(retry_count, base_fee_per_gas, blob_basefee) } /// Bumps the gas parameters based on the retry count, base fee, and blob base fee.