From f3dd2c8559a49c6be4a87a46ba2c011436f66db6 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 14 May 2025 08:51:51 -0400 Subject: [PATCH 1/3] refactor: continuing to clean --- bin/builder.rs | 2 +- src/tasks/block.rs | 72 +++++++++++++++++++++------------------------- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/bin/builder.rs b/bin/builder.rs index 362755a..b9811ff 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -47,7 +47,7 @@ async fn main() -> eyre::Result<()> { let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), slot_calculator)); let (basefee_jh, sim_cache_jh) = - sim.clone().spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone()); + sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone()); let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel); diff --git a/src/tasks/block.rs b/src/tasks/block.rs index b18095b..e674d2a 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -130,7 +130,7 @@ impl Simulator { /// A `JoinHandle` for the basefee updater and a `JoinHandle` for the /// cache handler. pub fn spawn_cache_tasks( - self: Arc, + &self, tx_receiver: mpsc::UnboundedReceiver, bundle_receiver: mpsc::UnboundedReceiver, cache: SimCache, @@ -139,9 +139,10 @@ impl Simulator { let basefee_price = Arc::new(AtomicU64::new(0_u64)); let basefee_reader = Arc::clone(&basefee_price); + let fut = self.basefee_updater_fut(basefee_price); // Update the basefee on a per-block cadence - let basefee_jh = tokio::spawn(async move { self.basefee_updater(basefee_price).await }); + let basefee_jh = tokio::spawn(fut); // Update the sim cache whenever a transaction or bundle is received with respect to the basefee let cache_jh = tokio::spawn(async move { @@ -162,45 +163,36 @@ impl Simulator { /// # Arguments /// /// - `price`: A shared `Arc` used to store the updated basefee value. - async fn basefee_updater(self: Arc, price: Arc) { - debug!("starting basefee updater"); - loop { - // calculate start of next slot plus a small buffer - let time_remaining = self.slot_calculator.slot_duration() - - self.slot_calculator.current_timepoint_within_slot() - + 1; - debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot"); - - // wait until that point in time - sleep(Duration::from_secs(time_remaining)).await; - - // update the basefee with that price - self.check_basefee(&price).await; - } - } - - /// Queries the latest block from the rollup provider and updates the shared - /// basefee value if a block is found. - /// - /// This function retrieves the latest block using the provider, extracts the - /// `base_fee_per_gas` field from the block header (defaulting to zero if missing), - /// and updates the shared `AtomicU64` price tracker. If no block is available, - /// it logs a message without updating the price. - /// - /// # Arguments - /// - /// - `price`: A shared `Arc` used to store the updated basefee. - async fn check_basefee(&self, price: &Arc) { - let resp = self.ru_provider.get_block_by_number(Latest).await.inspect_err(|e| { - error!(error = %e, "RPC error during basefee update"); - }); + fn basefee_updater_fut(&self, price: Arc) -> impl Future + use<>{ + let slot_calculator = self.slot_calculator.clone(); + let ru_provider = self.ru_provider.clone(); + + async move { + debug!("starting basefee updater"); + loop { + // calculate start of next slot plus a small buffer + let time_remaining = slot_calculator.slot_duration() + - slot_calculator.current_timepoint_within_slot() + + 1; + debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot"); + + // wait until that point in time + sleep(Duration::from_secs(time_remaining)).await; + + // update the basefee with that price + let resp = ru_provider.get_block_by_number(Latest).await.inspect_err(|e| { + error!(error = %e, "RPC error during basefee update"); + }); + + if let Ok(Some(block)) = resp { + let basefee = block.header.base_fee_per_gas.unwrap_or(0); + price.store(basefee, Ordering::Relaxed); + debug!(basefee = basefee, "basefee updated"); + } else { + warn!("get basefee failed - an error likely occurred"); + } + } - if let Ok(Some(block)) = resp { - let basefee = block.header.base_fee_per_gas.unwrap_or(0); - price.store(basefee, Ordering::Relaxed); - debug!(basefee = basefee, "basefee updated"); - } else { - warn!("get basefee failed - an error likely occurred"); } } From 499d87a35d7a955784d08f11a8e0086e51c69de6 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 14 May 2025 11:26:47 -0400 Subject: [PATCH 2/3] lint: fmt --- src/tasks/block.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tasks/block.rs b/src/tasks/block.rs index e674d2a..d8c6112 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -163,7 +163,7 @@ impl Simulator { /// # Arguments /// /// - `price`: A shared `Arc` used to store the updated basefee value. - fn basefee_updater_fut(&self, price: Arc) -> impl Future + use<>{ + fn basefee_updater_fut(&self, price: Arc) -> impl Future + use<> { let slot_calculator = self.slot_calculator.clone(); let ru_provider = self.ru_provider.clone(); @@ -192,7 +192,6 @@ impl Simulator { warn!("get basefee failed - an error likely occurred"); } } - } } From 7f8b7c3a95c1a5f938ad07531b053b79c5b09bf0 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 14 May 2025 11:27:01 -0400 Subject: [PATCH 3/3] lint: clippy --- src/tasks/block.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tasks/block.rs b/src/tasks/block.rs index d8c6112..4718581 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -164,7 +164,7 @@ impl Simulator { /// /// - `price`: A shared `Arc` used to store the updated basefee value. fn basefee_updater_fut(&self, price: Arc) -> impl Future + use<> { - let slot_calculator = self.slot_calculator.clone(); + let slot_calculator = self.slot_calculator; let ru_provider = self.ru_provider.clone(); async move {