From 4cab4728bee64bb2f58c45ca6be58067796e7372 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 14 May 2025 08:53:26 -0400 Subject: [PATCH] fix: remove arc and clones --- Cargo.toml | 1 + bin/builder.rs | 5 +- src/config.rs | 12 ++-- src/tasks/block/sim.rs | 6 +- src/tasks/env.rs | 117 ++++++++++++++++++++++++++++++++++++ src/tasks/mod.rs | 3 + tests/block_builder_test.rs | 11 ++-- 7 files changed, 136 insertions(+), 19 deletions(-) create mode 100644 src/tasks/env.rs diff --git a/Cargo.toml b/Cargo.toml index dd8b233..c7ccc14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,3 +65,4 @@ tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] } async-trait = "0.1.80" oauth2 = "4.4.2" chrono = "0.4.41" +tokio-stream = "0.1.17" diff --git a/bin/builder.rs b/bin/builder.rs index 2356013..2eef970 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -6,7 +6,6 @@ use builder::{ use init4_bin_base::{deps::tracing, utils::from_env::FromEnv}; use signet_sim::SimCache; use signet_types::constants::SignetSystemConstants; -use std::sync::Arc; use tokio::select; use tracing::info_span; @@ -44,12 +43,12 @@ async fn main() -> eyre::Result<()> { let sim_items = SimCache::new(); let slot_calculator = config.slot_calculator; - let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), slot_calculator)); + let sim = Simulator::new(&config, ru_provider.clone(), slot_calculator); let (basefee_jh, sim_cache_jh) = sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone()); - let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel); + let build_jh = sim.spawn_simulator_task(constants, sim_items.clone(), submit_channel); let server = serve_builder(([0, 0, 0, 0], config.builder_port)); diff --git a/src/config.rs b/src/config.rs index 61d1808..c795fc1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,6 +20,12 @@ use oauth2::url; use signet_zenith::Zenith; use std::borrow::Cow; +/// Type alias for the provider used to simulate against rollup state. +pub type RuProvider = RootProvider; + +/// A [`Zenith`] contract instance using [`Provider`] as the provider. +pub type ZenithInstance

= Zenith::ZenithInstance<(), P, alloy::network::Ethereum>; + /// Type alias for the provider used to build and submit blocks to the host. pub type HostProvider = FillProvider< JoinFill< @@ -158,12 +164,6 @@ pub struct BuilderConfig { pub slot_calculator: SlotCalculator, } -/// Type alias for the provider used to simulate against rollup state. -pub type RuProvider = RootProvider; - -/// A [`Zenith`] contract instance using [`Provider`] as the provider. -pub type ZenithInstance

= Zenith::ZenithInstance<(), P, alloy::network::Ethereum>; - impl BuilderConfig { /// Connect to the Builder signer. pub async fn connect_builder_signer(&self) -> Result { diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 09b7b26..fec1260 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -202,12 +202,12 @@ impl Simulator { /// /// A `JoinHandle` for the spawned task. pub fn spawn_simulator_task( - self: Arc, + self, constants: SignetSystemConstants, cache: SimCache, submit_sender: mpsc::UnboundedSender, ) -> JoinHandle<()> { - debug!("starting builder task"); + debug!("starting simulator task"); tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await }) } @@ -227,7 +227,7 @@ impl Simulator { /// - `cache`: The simulation cache containing transactions and bundles. /// - `submit_sender`: A channel sender used to submit built blocks. async fn run_simulator( - self: Arc, + self, constants: SignetSystemConstants, cache: SimCache, submit_sender: mpsc::UnboundedSender, diff --git a/src/tasks/env.rs b/src/tasks/env.rs new file mode 100644 index 0000000..75cf4d4 --- /dev/null +++ b/src/tasks/env.rs @@ -0,0 +1,117 @@ +use crate::config::{BuilderConfig, RuProvider}; +use alloy::{ + consensus::Header, + eips::eip1559::BaseFeeParams, + primitives::{B256, U256}, + providers::Provider, +}; +use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span}; +use std::time::Duration; +use tokio::sync::watch; +use tokio_stream::StreamExt; +use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice}; + +/// A task that constructs a BlockEnv for the next block in the rollup chain. +#[derive(Debug, Clone)] +pub struct EnvTask { + config: BuilderConfig, + provider: RuProvider, +} + +impl EnvTask { + /// Create a new EnvTask with the given config and provider. + pub const fn new(config: BuilderConfig, provider: RuProvider) -> Self { + Self { config, provider } + } + + /// Construct a BlockEnv by making calls to the provider. + pub fn construct_block_env(&self, previous: &Header) -> BlockEnv { + BlockEnv { + number: previous.number + 1, + beneficiary: self.config.builder_rewards_address, + // NB: EXACTLY the same as the previous block + timestamp: previous.number + self.config.slot_calculator.slot_duration(), + gas_limit: self.config.rollup_block_gas_limit, + basefee: previous + .next_block_base_fee(BaseFeeParams::ethereum()) + .expect("signet has no non-1559 headers"), + difficulty: U256::ZERO, + prevrandao: Some(B256::random()), + blob_excess_gas_and_price: Some(BlobExcessGasAndPrice { + excess_blob_gas: 0, + blob_gasprice: 0, + }), + } + } + + /// Construct the BlockEnv and send it to the sender. + pub async fn task_fut(self, sender: watch::Sender>) { + let span = info_span!("EnvTask::task_fut::init"); + let mut poller = match self.provider.watch_blocks().instrument(span.clone()).await { + Ok(poller) => poller, + Err(err) => { + let _span = span.enter(); + error!(%err, "Failed to watch blocks"); + return; + } + }; + + poller.set_poll_interval(Duration::from_millis(250)); + + let mut blocks = poller.into_stream(); + + while let Some(blocks) = + blocks.next().instrument(info_span!("EnvTask::task_fut::stream")).await + { + let Some(block) = blocks.last() else { + // This case occurs when there are no changes to the block, + // so we do nothing. + debug!("empty filter changes"); + continue; + }; + let span = info_span!("EnvTask::task_fut::loop", hash = %block, number = tracing::field::Empty); + + let previous = match self + .provider + .get_block((*block).into()) + .into_future() + .instrument(span.clone()) + .await + { + Ok(Some(block)) => block.header.inner, + Ok(None) => { + let _span = span.enter(); + let _ = sender.send(None); + debug!("block not found"); + // This may mean the chain had a rollback, so the next poll + // should find something. + continue; + } + Err(err) => { + let _span = span.enter(); + let _ = sender.send(None); + error!(%err, "Failed to get latest block"); + // Error may be transient, so we should not break the loop. + continue; + } + }; + span.record("number", previous.number); + + let env = self.construct_block_env(&previous); + debug!(?env, "constructed block env"); + if sender.send(Some(env)).is_err() { + // The receiver has been dropped, so we can stop the task. + break; + } + } + } + + /// Spawn the task and return a watch::Receiver for the BlockEnv. + pub fn spawn(self) -> watch::Receiver> { + let (sender, receiver) = watch::channel(None); + let fut = self.task_fut(sender); + tokio::spawn(fut); + + receiver + } +} diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 82d3e40..bc26149 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -15,3 +15,6 @@ pub mod tx_poller; /// Block simulation and environment pub mod block; + +/// Constructs the simualtion environment. +pub mod env; diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs index 79d4ee6..93d2038 100644 --- a/tests/block_builder_test.rs +++ b/tests/block_builder_test.rs @@ -15,10 +15,7 @@ mod tests { use init4_bin_base::utils::calc::SlotCalculator; use signet_sim::{SimCache, SimItem}; use signet_types::constants::SignetSystemConstants; - use std::{ - sync::Arc, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, - }; + use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::{sync::mpsc::unbounded_channel, time::timeout}; /// Tests the `handle_build` method of the `Simulator`. @@ -108,16 +105,16 @@ mod tests { // Create a rollup provider let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); - let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), config.slot_calculator)); + let sim = Simulator::new(&config, ru_provider.clone(), config.slot_calculator); // Create a shared sim cache let sim_cache = SimCache::new(); // Create a sim cache and start filling it with items - sim.clone().spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone()); + sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone()); // Finally, Kick off the block builder task. - sim.clone().spawn_simulator_task(constants, sim_cache.clone(), block_sender); + sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender); // Feed in transactions to the tx_sender and wait for the block to be simulated let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();