From 3e2c5fa52f590d7708e56f51aa901e37de34e6d3 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 3 Jun 2025 19:10:11 -0600 Subject: [PATCH 1/7] passes sim result to the submit tasks - adds a SimResult type that binds a BlockEnv to a BuiltBlock - passess that SimResult to the SubmitTask for gas calculations --- bin/submit_transaction.rs | 20 +++++- src/tasks/block/sim.rs | 83 +++++++++++++++++++------ src/tasks/env.rs | 5 +- src/tasks/submit.rs | 119 ++++++++++++++++++++++-------------- tests/block_builder_test.rs | 60 +----------------- 5 files changed, 160 insertions(+), 127 deletions(-) diff --git a/bin/submit_transaction.rs b/bin/submit_transaction.rs index 71b10f4..071f74b 100644 --- a/bin/submit_transaction.rs +++ b/bin/submit_transaction.rs @@ -1,3 +1,5 @@ +//! A simple transaction submitter that sends a transaction to a recipient address +//! on a regular interval for the purposes of roughly testing rollup mining. use alloy::{ network::{EthereumWallet, TransactionBuilder}, primitives::{Address, U256}, @@ -67,18 +69,29 @@ async fn main() { } } +/// Sends a transaction to the specified recipient address async fn send_transaction(provider: &HostProvider, recipient_address: Address) { // construct simple transaction to send ETH to a recipient + let nonce = match provider.get_transaction_count(provider.default_signer_address()).await { + Ok(count) => count, + Err(e) => { + error!(error = ?e, "failed to get transaction count"); + return; + } + }; + let tx = TransactionRequest::default() .with_from(provider.default_signer_address()) .with_to(recipient_address) .with_value(U256::from(1)) + .with_nonce(nonce) .with_gas_limit(30_000); // start timer to measure how long it takes to mine the transaction let dispatch_start_time: Instant = Instant::now(); // dispatch the transaction + debug!(?tx.nonce, "sending transaction with nonce"); let result = provider.send_transaction(tx).await.unwrap(); // wait for the transaction to mine @@ -95,10 +108,13 @@ async fn send_transaction(provider: &HostProvider, recipient_address: Address) { } }; - let hash = receipt.transaction_hash.to_string(); + record_metrics(dispatch_start_time, receipt); +} - // record metrics for how long it took to mine the transaction +/// Record metrics for how long it took to mine the transaction +fn record_metrics(dispatch_start_time: Instant, receipt: alloy::rpc::types::TransactionReceipt) { let mine_time = dispatch_start_time.elapsed().as_secs(); + let hash = receipt.transaction_hash.to_string(); debug!(success = receipt.status(), mine_time, hash, "transaction mined"); histogram!("txn_submitter.tx_mine_time").record(mine_time as f64); } diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index ce7ffb3..e59e799 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -9,7 +9,7 @@ use init4_bin_base::{ }; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; use signet_types::constants::SignetSystemConstants; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::{ sync::{ mpsc::{self}, @@ -17,6 +17,7 @@ use tokio::{ }, task::JoinHandle, }; +use tracing::trace; use trevm::revm::{ context::BlockEnv, database::{AlloyDB, WrapDatabaseAsync}, @@ -34,11 +35,19 @@ pub struct Simulator { pub config: BuilderConfig, /// A provider that cannot sign transactions, used for interacting with the rollup. pub ru_provider: RuProvider, - /// The block configuration environment on which to simulate pub block_env: watch::Receiver>, } +/// SimResult bundles a BuiltBlock to the BlockEnv it was simulated against. +#[derive(Debug, Clone)] +pub struct SimResult { + /// The block built with the successfully simulated transactions + pub block: BuiltBlock, + /// The block environment the transactions were simulated against. + pub env: BlockEnv, +} + impl Simulator { /// Creates a new `Simulator` instance. /// @@ -46,6 +55,7 @@ impl Simulator { /// /// - `config`: The configuration for the builder. /// - `ru_provider`: A provider for interacting with the rollup. + /// - `block_env`: A receiver for the block environment to simulate against. /// /// # Returns /// @@ -70,6 +80,7 @@ impl Simulator { /// - `constants`: The system constants for the rollup. /// - `sim_items`: The simulation cache containing transactions and bundles. /// - `finish_by`: The deadline by which the block must be built. + /// - `block_env`: The block environment to simulate against. /// /// # Returns /// @@ -79,14 +90,22 @@ impl Simulator { constants: SignetSystemConstants, sim_items: SimCache, finish_by: Instant, - block: BlockEnv, + block_env: BlockEnv, ) -> eyre::Result { + debug!( + block_number = block_env.number, + deadline = ?self.instant_to_timestamp(finish_by), + tx_count= sim_items.len(), + "starting block build", + ); + let db = self.create_db().await.unwrap(); + let block_build: BlockBuild<_, NoOpInspector> = BlockBuild::new( db, constants, self.config.cfg_env(), - block, + block_env, finish_by, self.config.concurrency_limit, sim_items, @@ -94,7 +113,11 @@ impl Simulator { ); let built_block = block_build.build().await; - debug!(block_number = ?built_block.block_number(), "finished building block"); + debug!( + tx_count = built_block.tx_count(), + block_number = ?built_block.block_number(), + "block simulation completed", + ); Ok(built_block) } @@ -115,7 +138,7 @@ impl Simulator { self, constants: SignetSystemConstants, cache: SimCache, - submit_sender: mpsc::UnboundedSender, + submit_sender: mpsc::UnboundedSender, ) -> JoinHandle<()> { debug!("starting simulator task"); @@ -140,26 +163,23 @@ impl Simulator { mut self, constants: SignetSystemConstants, cache: SimCache, - submit_sender: mpsc::UnboundedSender, + submit_sender: mpsc::UnboundedSender, ) { loop { - let sim_cache = cache.clone(); - let finish_by = self.calculate_deadline(); - // Wait for the block environment to be set if self.block_env.changed().await.is_err() { error!("block_env channel closed"); return; } - // If no env, skip this run let Some(block_env) = self.block_env.borrow_and_update().clone() else { return }; - debug!(block_env = ?block_env, "building on block env"); - match self.handle_build(constants, sim_cache, finish_by, block_env).await { + let finish_by = self.calculate_deadline(); + let sim_cache = cache.clone(); + match self.handle_build(constants, sim_cache, finish_by, block_env.clone()).await { Ok(block) => { - debug!(block = ?block, "built block"); - let _ = submit_sender.send(block); + debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built block"); + let _ = submit_sender.send(SimResult { block, env: block_env }); } Err(e) => { error!(err = %e, "failed to build block"); @@ -178,17 +198,25 @@ impl Simulator { pub fn calculate_deadline(&self) -> Instant { // Get the current timepoint within the slot. let timepoint = self.slot_calculator().current_timepoint_within_slot(); + trace!(timepoint, "current timepoint within slot"); // We have the timepoint in seconds into the slot. To find out what's // remaining, we need to subtract it from the slot duration let remaining = self.slot_calculator().slot_duration() - timepoint; + trace!(remaining, "time remaining in slot"); // We add a 1500 ms buffer to account for sequencer stopping signing. - - let candidate = + let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); + trace!(deadline = ?self.instant_to_timestamp(deadline), "calculated deadline for block simulation"); + + let buffered_deadline = deadline.max(Instant::now()); + trace!(?buffered_deadline, "final deadline for block simulation"); - candidate.max(Instant::now()) + let timestamp = self.instant_to_timestamp(buffered_deadline); + trace!(?timestamp, "deadline converted to timestamp"); + + buffered_deadline } /// Creates an `AlloyDB` instance from the rollup provider. @@ -217,4 +245,23 @@ impl Simulator { let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap(); Some(wrapped_db) } + + /// Converts an `Instant` to a UNIX timestamp in seconds and milliseconds. + pub fn instant_to_timestamp(&self, instant: Instant) -> (u64, u128) { + let now_instant = Instant::now(); + let now_system = SystemTime::now(); + + let duration_from_now = now_instant.duration_since(instant); + + // Subtract that duration from the system time + let target_system_time = now_system - duration_from_now; + + let duration_since_epoch = + target_system_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); + + let seconds = duration_since_epoch.as_secs(); + let milliseconds = duration_since_epoch.as_millis(); + + (seconds, milliseconds) + } } diff --git a/src/tasks/env.rs b/src/tasks/env.rs index 448d2c8..a2259ca 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -66,7 +66,6 @@ impl EnvTask { let Some(block) = blocks.last() else { // This case occurs when there are no changes to the block, // so we do nothing. - debug!("empty filter changes"); continue; }; let span = info_span!("EnvTask::task_fut::loop", hash = %block, number = tracing::field::Empty); @@ -96,10 +95,10 @@ impl EnvTask { } }; span.record("number", previous.number); - debug!("retrieved latest block"); let env = self.construct_block_env(&previous); - debug!(?env, "constructed block env"); + debug!(block_number = ?env.number, env.basefee, "constructed latest block env"); + if sender.send(Some(env)).is_err() { // The receiver has been dropped, so we can stop the task. debug!("receiver dropped, stopping task"); diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index cab53cb..a081239 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -25,14 +25,13 @@ use signet_zenith::{ Zenith::{self, IncorrectHostBlock}, }; use std::time::{Instant, UNIX_EPOCH}; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::mpsc::{self}, + task::JoinHandle, +}; +use trevm::revm::context::BlockEnv; -/// Base maximum fee per gas to use as a starting point for retry bumps -pub const BASE_FEE_PER_GAS: u128 = 10 * GWEI_TO_WEI as u128; -/// Base max priority fee per gas to use as a starting point for retry bumps -pub const BASE_MAX_PRIORITY_FEE_PER_GAS: u128 = 2 * GWEI_TO_WEI as u128; -/// Base maximum fee per blob gas to use as a starting point for retry bumps -pub const BASE_MAX_FEE_PER_BLOB_GAS: u128 = GWEI_TO_WEI as u128; +use super::block::sim::SimResult; macro_rules! spawn_provider_send { ($provider:expr, $tx:expr) => { @@ -222,7 +221,7 @@ impl SubmitTask { }) } - /// Builds blob transaction and encodes the sidecar for it from the provided header and signature values + /// Encodes the sidecar and then builds the 4844 blob transaction from the provided header and signature values. fn build_blob_tx( &self, fills: Vec, @@ -245,8 +244,9 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, + block_env: &BlockEnv, ) -> eyre::Result { - let tx = self.prepare_tx(retry_count, resp, block).await?; + let tx = self.prepare_tx(retry_count, resp, block, block_env).await?; self.send_transaction(resp, tx).await } @@ -258,9 +258,11 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, + block_env: &BlockEnv, ) -> Result { // Create the transaction request with the signature values - let tx: TransactionRequest = self.new_tx_request(retry_count, resp, block).await?; + let tx: TransactionRequest = + self.new_tx_request(retry_count, resp, block, block_env).await?; // Simulate the transaction with a call to the host provider and report any errors if let Err(err) = self.sim_with_call(&tx).await { @@ -288,6 +290,7 @@ impl SubmitTask { retry_count: usize, resp: &SignResponse, block: &BuiltBlock, + block_env: &BlockEnv, ) -> Result { // manually retrieve nonce let nonce = @@ -297,14 +300,8 @@ impl SubmitTask { // Extract the signature components from the response let (v, r, s) = extract_signature_components(&resp.sig); - // Calculate gas limits based on retry attempts let (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) = - calculate_gas_limits( - retry_count, - BASE_FEE_PER_GAS, - BASE_MAX_PRIORITY_FEE_PER_GAS, - BASE_MAX_FEE_PER_BLOB_GAS, - ); + calculate_gas(retry_count, block_env); // Build the block header let header: BlockHeader = BlockHeader { @@ -386,6 +383,7 @@ impl SubmitTask { &self, retry_count: usize, block: &BuiltBlock, + block_env: &BlockEnv, ) -> eyre::Result { info!(retry_count, txns = block.tx_count(), "handling inbound block"); let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| { @@ -397,18 +395,20 @@ impl SubmitTask { debug!( host_block_number = %sig_request.host_block_number, ru_chain_id = %sig_request.ru_chain_id, + tx_count = block.tx_count(), "constructed signature request for host block" ); let signed = self.quincey.get_signature(&sig_request).await?; - self.submit_transaction(retry_count, &signed, block).await + self.submit_transaction(retry_count, &signed, block, block_env).await } /// Handles the retry logic for the inbound block. async fn retrying_handle_inbound( &self, block: &BuiltBlock, + block_env: &BlockEnv, retry_limit: usize, ) -> eyre::Result { let mut retries = 0; @@ -418,11 +418,11 @@ impl SubmitTask { // Retry loop let result = loop { - // Log the retry attempt let span = debug_span!("SubmitTask::retrying_handle_inbound", retries); let inbound_result = - match self.handle_inbound(retries, block).instrument(span.clone()).await { + match self.handle_inbound(retries, block, block_env).instrument(span.clone()).await + { Ok(control_flow) => control_flow, Err(err) => { // Delay until next slot if we get a 403 error @@ -431,6 +431,7 @@ impl SubmitTask { debug!(slot_number, "403 detected - skipping slot"); return Ok(ControlFlow::Skip); } else { + // Otherwise, log error and retry error!(error = %err, "error handling inbound block"); } @@ -500,29 +501,33 @@ impl SubmitTask { /// Task future for the submit task /// NB: This task assumes that the simulator will only send it blocks for /// slots that it's assigned. - async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { + async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { // Holds a reference to the last block we attempted to submit let mut last_block_attempted: u64 = 0; loop { // Wait to receive a new block - let Some(block) = inbound.recv().await else { + let Some(result) = inbound.recv().await else { debug!("upstream task gone"); break; }; - debug!(block_number = block.block_number(), ?block, "submit channel received block"); + + debug!(block_number = result.block.block_number(), "submit channel received block"); // Only attempt each block number once - if block.block_number() == last_block_attempted { - debug!("block number is unchanged from last attempt - skipping"); + if result.block.block_number() == last_block_attempted { + debug!( + block_number = result.block.block_number(), + "block number is unchanged from last attempt - skipping" + ); continue; } // This means we have encountered a new block, so reset the last block attempted - last_block_attempted = block.block_number(); + last_block_attempted = result.block.block_number(); debug!(last_block_attempted, "resetting last block attempted"); - if self.retrying_handle_inbound(&block, 3).await.is_err() { + if self.retrying_handle_inbound(&result.block, &result.env, 3).await.is_err() { debug!("error handling inbound block"); continue; }; @@ -530,36 +535,60 @@ impl SubmitTask { } /// Spawns the in progress block building task - pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { - let (sender, inbound) = mpsc::unbounded_channel(); + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let (sender, inbound) = mpsc::unbounded_channel::(); let handle = tokio::spawn(self.task_future(inbound)); (sender, handle) } } -// Returns gas parameters based on retry counts. -fn calculate_gas_limits( +/// Calculates gas parameters based on the block environment and retry count. +fn calculate_gas(retry_count: usize, block_env: &BlockEnv) -> (u128, u128, u128) { + let fallback_blob_basefee = 500; + + match block_env.blob_excess_gas_and_price { + Some(excess) => { + if excess.blob_gasprice == 0 { + warn!("blob excess gas price is zero, using default blob base fee"); + return bump_gas_from_retries( + retry_count, + block_env.basefee, + fallback_blob_basefee, + ); + } + + bump_gas_from_retries(retry_count, block_env.basefee, excess.blob_gasprice) + } + None => { + warn!("no blob excess gas and price in block env, using defaults"); + bump_gas_from_retries(retry_count, block_env.basefee, fallback_blob_basefee) + } + } +} + +/// Bumps the gas parameters based on the retry count, base fee, and blob base fee. +pub fn bump_gas_from_retries( retry_count: usize, - base_max_fee_per_gas: u128, - base_max_priority_fee_per_gas: u128, - base_max_fee_per_blob_gas: u128, + basefee: u64, + blob_basefee: u128, ) -> (u128, u128, u128) { - let bump_multiplier = 1150u128.pow(retry_count as u32); // 15% bump - let blob_bump_multiplier = 2000u128.pow(retry_count as u32); // 100% bump (double each time) for blob gas - let bump_divisor = 1000u128.pow(retry_count as u32); + const PRIORITY_FEE_BASE: u64 = 2 * GWEI_TO_WEI; + const BASE_MULTIPLIER: u128 = 2; + const BLOB_MULTIPLIER: u128 = 2; + + // Increase priority fee by 20% per retry + let priority_fee = + PRIORITY_FEE_BASE * (12u64.pow(retry_count as u32) / 10u64.pow(retry_count as u32)); - let max_fee_per_gas = base_max_fee_per_gas * bump_multiplier / bump_divisor; - let max_priority_fee_per_gas = base_max_priority_fee_per_gas * bump_multiplier / bump_divisor; - let max_fee_per_blob_gas = base_max_fee_per_blob_gas * blob_bump_multiplier / bump_divisor; + // Max fee includes basefee + priority + headroom (double basefee, etc.) + let max_fee_per_gas = (basefee as u128) * BASE_MULTIPLIER + (priority_fee as u128); + let max_fee_per_blob_gas = blob_basefee * BLOB_MULTIPLIER * (retry_count as u128 + 1); debug!( retry_count, - max_fee_per_gas, - max_priority_fee_per_gas, - max_fee_per_blob_gas, - "calculated bumped gas parameters" + max_fee_per_gas, priority_fee, max_fee_per_blob_gas, "calculated bumped gas parameters" ); - (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) + (max_fee_per_gas, priority_fee as u128, max_fee_per_blob_gas) } diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs index a88869b..1317a26 100644 --- a/tests/block_builder_test.rs +++ b/tests/block_builder_test.rs @@ -8,13 +8,12 @@ use alloy::{ signers::local::PrivateKeySigner, }; use builder::{ - tasks::{block::sim::Simulator, cache::CacheTask}, + tasks::block::sim::Simulator, test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env}, }; use signet_sim::{SimCache, SimItem}; use signet_types::constants::SignetSystemConstants; use std::time::{Duration, Instant}; -use tokio::{sync::mpsc::unbounded_channel, time::timeout}; /// Tests the `handle_build` method of the `Simulator`. /// @@ -71,60 +70,3 @@ async fn test_handle_build() { assert!(got.is_ok()); assert!(got.unwrap().tx_count() == 2); } - -/// Tests the full block builder loop, including transaction ingestion and block simulation. -/// -/// This test sets up a simulated environment using Anvil, creates a block builder, -/// and verifies that the builder can process incoming transactions and produce a block -/// within a specified timeout. -#[ignore = "integration test"] -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_spawn() { - setup_logging(); - - // Make a test config - let config = setup_test_config().unwrap(); - let constants = SignetSystemConstants::pecorino(); - - // Create an anvil instance for testing - let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn(); - - // Create a wallet - let keys = anvil_instance.keys(); - let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); - let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); - - // Plumb inputs for the test setup - let (tx_sender, tx_receiver) = unbounded_channel(); - let (_, bundle_receiver) = unbounded_channel(); - let (block_sender, mut block_receiver) = unbounded_channel(); - - let env_task = config.env_task(); - let (block_env, _env_jh) = env_task.spawn(); - - let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver); - let (sim_cache, _cache_jh) = cache_task.spawn(); - - // Create a rollup provider - let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); - - let sim = Simulator::new(&config, ru_provider.clone(), block_env); - - // Finally, Kick off the block builder task. - sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender); - - // Feed in transactions to the tx_sender and wait for the block to be simulated - let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); - let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); - tx_sender.send(tx_1).unwrap(); - tx_sender.send(tx_2).unwrap(); - - // Wait for a block with timeout - let result = timeout(Duration::from_secs(5), block_receiver.recv()).await; - assert!(result.is_ok(), "Did not receive block within 5 seconds"); - - // Assert on the block - let block = result.unwrap(); - assert!(block.is_some(), "Block channel closed without receiving a block"); - assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet. -} From 2a2ef38b1b79913d3f968240248608f6e6e8aaf4 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:00:52 -0600 Subject: [PATCH 2/7] better logging --- src/tasks/mod.rs | 2 +- src/tasks/submit.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index eee57d5..4715d74 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -10,5 +10,5 @@ pub mod metrics; /// Tx submission task pub mod submit; -/// Constructs the simualtion environment. +/// Constructs the simulation environment. pub mod env; diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index a081239..ca6bc9c 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -413,6 +413,7 @@ impl SubmitTask { ) -> eyre::Result { let mut retries = 0; let building_start_time = Instant::now(); + let (current_slot, start, end) = self.calculate_slot_window(); debug!(current_slot, start, end, "calculating target slot window"); @@ -467,9 +468,15 @@ impl SubmitTask { }; // This is reached when `Done` or `Skip` is returned - histogram!("builder.block_build_time") - .record(building_start_time.elapsed().as_millis() as f64); - info!(?result, "finished block building"); + let elapsed = building_start_time.elapsed().as_millis() as f64; + histogram!("builder.block_build_time").record(elapsed); + info!( + ?result, + tx_count = block.tx_count(), + block_number = block.block_number(), + build_time = ?elapsed, + "finished block building" + ); Ok(result) } @@ -508,10 +515,9 @@ impl SubmitTask { loop { // Wait to receive a new block let Some(result) = inbound.recv().await else { - debug!("upstream task gone"); + debug!("upstream task gone - exiting submit task"); break; }; - debug!(block_number = result.block.block_number(), "submit channel received block"); // Only attempt each block number once From ee986eb73ec17a6be41d178f6e7d8db19eaabd39 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:38:46 -0600 Subject: [PATCH 3/7] cleanup --- src/tasks/block/sim.rs | 28 +++------------------------- src/tasks/submit.rs | 22 +++------------------- 2 files changed, 6 insertions(+), 44 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index e59e799..22a5e69 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -9,7 +9,7 @@ use init4_bin_base::{ }; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; use signet_types::constants::SignetSystemConstants; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant}; use tokio::{ sync::{ mpsc::{self}, @@ -94,7 +94,7 @@ impl Simulator { ) -> eyre::Result { debug!( block_number = block_env.number, - deadline = ?self.instant_to_timestamp(finish_by), + ?finish_by, tx_count= sim_items.len(), "starting block build", ); @@ -208,14 +208,11 @@ impl Simulator { // We add a 1500 ms buffer to account for sequencer stopping signing. let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); - trace!(deadline = ?self.instant_to_timestamp(deadline), "calculated deadline for block simulation"); + trace!(?deadline, "calculated deadline for block simulation"); let buffered_deadline = deadline.max(Instant::now()); trace!(?buffered_deadline, "final deadline for block simulation"); - let timestamp = self.instant_to_timestamp(buffered_deadline); - trace!(?timestamp, "deadline converted to timestamp"); - buffered_deadline } @@ -245,23 +242,4 @@ impl Simulator { let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap(); Some(wrapped_db) } - - /// Converts an `Instant` to a UNIX timestamp in seconds and milliseconds. - pub fn instant_to_timestamp(&self, instant: Instant) -> (u64, u128) { - let now_instant = Instant::now(); - let now_system = SystemTime::now(); - - let duration_from_now = now_instant.duration_since(instant); - - // Subtract that duration from the system time - let target_system_time = now_system - duration_from_now; - - let duration_since_epoch = - target_system_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); - - let seconds = duration_since_epoch.as_secs(); - let milliseconds = duration_since_epoch.as_millis(); - - (seconds, milliseconds) - } } diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index ca6bc9c..7a42d56 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -509,9 +509,6 @@ impl SubmitTask { /// NB: This task assumes that the simulator will only send it blocks for /// slots that it's assigned. async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { - // Holds a reference to the last block we attempted to submit - let mut last_block_attempted: u64 = 0; - loop { // Wait to receive a new block let Some(result) = inbound.recv().await else { @@ -520,23 +517,10 @@ impl SubmitTask { }; debug!(block_number = result.block.block_number(), "submit channel received block"); - // Only attempt each block number once - if result.block.block_number() == last_block_attempted { - debug!( - block_number = result.block.block_number(), - "block number is unchanged from last attempt - skipping" - ); - continue; + if let Err(e) = self.retrying_handle_inbound(&result.block, &result.env, 3).await { + error!(error = %e, "error handling inbound block"); + continue; } - - // This means we have encountered a new block, so reset the last block attempted - last_block_attempted = result.block.block_number(); - debug!(last_block_attempted, "resetting last block attempted"); - - if self.retrying_handle_inbound(&result.block, &result.env, 3).await.is_err() { - debug!("error handling inbound block"); - continue; - }; } } From 2eca5437ce0dd391142870bd4685ba21241a8318 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:42:15 -0600 Subject: [PATCH 4/7] fmt --- src/tasks/block/sim.rs | 2 +- src/tasks/submit.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 22a5e69..9a097cd 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -95,7 +95,7 @@ impl Simulator { debug!( block_number = block_env.number, ?finish_by, - tx_count= sim_items.len(), + tx_count = sim_items.len(), "starting block build", ); diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 7a42d56..cc0421c 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -519,7 +519,7 @@ impl SubmitTask { if let Err(e) = self.retrying_handle_inbound(&result.block, &result.env, 3).await { error!(error = %e, "error handling inbound block"); - continue; + continue; } } } From ed653d06b97872adc5831eab1825f6428c6aa000 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:47:55 -0600 Subject: [PATCH 5/7] remove verbose logging --- src/tasks/block/sim.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 9a097cd..1bcc2a0 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -198,20 +198,16 @@ impl Simulator { pub fn calculate_deadline(&self) -> Instant { // Get the current timepoint within the slot. let timepoint = self.slot_calculator().current_timepoint_within_slot(); - trace!(timepoint, "current timepoint within slot"); // We have the timepoint in seconds into the slot. To find out what's // remaining, we need to subtract it from the slot duration let remaining = self.slot_calculator().slot_duration() - timepoint; - trace!(remaining, "time remaining in slot"); // We add a 1500 ms buffer to account for sequencer stopping signing. let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); - trace!(?deadline, "calculated deadline for block simulation"); let buffered_deadline = deadline.max(Instant::now()); - trace!(?buffered_deadline, "final deadline for block simulation"); buffered_deadline } From d4c167c0d6066d56d9b4b5b51559626497cabe4b Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 22:48:37 -0600 Subject: [PATCH 6/7] clippy --- src/tasks/block/sim.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 1bcc2a0..7e274a4 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -17,7 +17,6 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::trace; use trevm::revm::{ context::BlockEnv, database::{AlloyDB, WrapDatabaseAsync}, @@ -207,9 +206,7 @@ impl Simulator { let deadline = Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); - let buffered_deadline = deadline.max(Instant::now()); - - buffered_deadline + deadline.max(Instant::now()) } /// Creates an `AlloyDB` instance from the rollup provider. From b516c21187ede0d5ef28d721deee214b0b523dae Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 9 Jun 2025 23:08:41 -0600 Subject: [PATCH 7/7] fix: don't submit empty blocks --- src/tasks/submit.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index cc0421c..bcf00df 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -517,6 +517,12 @@ impl SubmitTask { }; debug!(block_number = result.block.block_number(), "submit channel received block"); + // Don't submit empty blocks + if result.block.is_empty() { + debug!("received empty block - skipping"); + continue; + } + if let Err(e) = self.retrying_handle_inbound(&result.block, &result.env, 3).await { error!(error = %e, "error handling inbound block"); continue;