Skip to content

chore: remove unnecessary arc #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn main() -> eyre::Result<()> {
let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), slot_calculator));

let (basefee_jh, sim_cache_jh) =
sim.clone().spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());

let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel);

Expand Down
73 changes: 32 additions & 41 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Simulator {
/// A `JoinHandle` for the basefee updater and a `JoinHandle` for the
/// cache handler.
pub fn spawn_cache_tasks(
self: Arc<Self>,
&self,
tx_receiver: mpsc::UnboundedReceiver<TxEnvelope>,
bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
cache: SimCache,
Expand All @@ -139,9 +139,10 @@ impl Simulator {

let basefee_price = Arc::new(AtomicU64::new(0_u64));
let basefee_reader = Arc::clone(&basefee_price);
let fut = self.basefee_updater_fut(basefee_price);

// Update the basefee on a per-block cadence
let basefee_jh = tokio::spawn(async move { self.basefee_updater(basefee_price).await });
let basefee_jh = tokio::spawn(fut);

// Update the sim cache whenever a transaction or bundle is received with respect to the basefee
let cache_jh = tokio::spawn(async move {
Expand All @@ -162,45 +163,35 @@ impl Simulator {
/// # Arguments
///
/// - `price`: A shared `Arc<AtomicU64>` used to store the updated basefee value.
async fn basefee_updater(self: Arc<Self>, price: Arc<AtomicU64>) {
debug!("starting basefee updater");
loop {
// calculate start of next slot plus a small buffer
let time_remaining = self.slot_calculator.slot_duration()
- self.slot_calculator.current_timepoint_within_slot()
+ 1;
debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot");

// wait until that point in time
sleep(Duration::from_secs(time_remaining)).await;

// update the basefee with that price
self.check_basefee(&price).await;
}
}

/// Queries the latest block from the rollup provider and updates the shared
/// basefee value if a block is found.
///
/// This function retrieves the latest block using the provider, extracts the
/// `base_fee_per_gas` field from the block header (defaulting to zero if missing),
/// and updates the shared `AtomicU64` price tracker. If no block is available,
/// it logs a message without updating the price.
///
/// # Arguments
///
/// - `price`: A shared `Arc<AtomicU64>` used to store the updated basefee.
async fn check_basefee(&self, price: &Arc<AtomicU64>) {
let resp = self.ru_provider.get_block_by_number(Latest).await.inspect_err(|e| {
error!(error = %e, "RPC error during basefee update");
});

if let Ok(Some(block)) = resp {
let basefee = block.header.base_fee_per_gas.unwrap_or(0);
price.store(basefee, Ordering::Relaxed);
debug!(basefee = basefee, "basefee updated");
} else {
warn!("get basefee failed - an error likely occurred");
fn basefee_updater_fut(&self, price: Arc<AtomicU64>) -> impl Future<Output = ()> + use<> {
let slot_calculator = self.slot_calculator;
let ru_provider = self.ru_provider.clone();

async move {
debug!("starting basefee updater");
loop {
// calculate start of next slot plus a small buffer
let time_remaining = slot_calculator.slot_duration()
- slot_calculator.current_timepoint_within_slot()
+ 1;
debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot");

// wait until that point in time
sleep(Duration::from_secs(time_remaining)).await;

// update the basefee with that price
let resp = ru_provider.get_block_by_number(Latest).await.inspect_err(|e| {
error!(error = %e, "RPC error during basefee update");
});

if let Ok(Some(block)) = resp {
let basefee = block.header.base_fee_per_gas.unwrap_or(0);
price.store(basefee, Ordering::Relaxed);
debug!(basefee = basefee, "basefee updated");
} else {
warn!("get basefee failed - an error likely occurred");
}
}
}
}

Expand Down