Skip to content

Env Task #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@ tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
async-trait = "0.1.80"
oauth2 = "4.4.2"
chrono = "0.4.41"
tokio-stream = "0.1.17"
5 changes: 2 additions & 3 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use builder::{
use init4_bin_base::{deps::tracing, utils::from_env::FromEnv};
use signet_sim::SimCache;
use signet_types::constants::SignetSystemConstants;
use std::sync::Arc;
use tokio::select;
use tracing::info_span;

Expand Down Expand Up @@ -44,12 +43,12 @@ async fn main() -> eyre::Result<()> {
let sim_items = SimCache::new();
let slot_calculator = config.slot_calculator;

let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), slot_calculator));
let sim = Simulator::new(&config, ru_provider.clone(), slot_calculator);

let (basefee_jh, sim_cache_jh) =
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());

let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel);
let build_jh = sim.spawn_simulator_task(constants, sim_items.clone(), submit_channel);

let server = serve_builder(([0, 0, 0, 0], config.builder_port));

Expand Down
12 changes: 6 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ use oauth2::url;
use signet_zenith::Zenith;
use std::borrow::Cow;

/// Type alias for the provider used to simulate against rollup state.
pub type RuProvider = RootProvider<Ethereum>;

/// A [`Zenith`] contract instance using [`Provider`] as the provider.
pub type ZenithInstance<P = HostProvider> = Zenith::ZenithInstance<(), P, alloy::network::Ethereum>;

/// Type alias for the provider used to build and submit blocks to the host.
pub type HostProvider = FillProvider<
JoinFill<
Expand Down Expand Up @@ -158,12 +164,6 @@ pub struct BuilderConfig {
pub slot_calculator: SlotCalculator,
}

/// Type alias for the provider used to simulate against rollup state.
pub type RuProvider = RootProvider<Ethereum>;

/// A [`Zenith`] contract instance using [`Provider`] as the provider.
pub type ZenithInstance<P = HostProvider> = Zenith::ZenithInstance<(), P, alloy::network::Ethereum>;

impl BuilderConfig {
/// Connect to the Builder signer.
pub async fn connect_builder_signer(&self) -> Result<LocalOrAws, SignerError> {
Expand Down
6 changes: 3 additions & 3 deletions src/tasks/block/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ impl Simulator {
///
/// A `JoinHandle` for the spawned task.
pub fn spawn_simulator_task(
self: Arc<Self>,
self,
constants: SignetSystemConstants,
cache: SimCache,
submit_sender: mpsc::UnboundedSender<BuiltBlock>,
) -> JoinHandle<()> {
debug!("starting builder task");
debug!("starting simulator task");

tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await })
}
Expand All @@ -227,7 +227,7 @@ impl Simulator {
/// - `cache`: The simulation cache containing transactions and bundles.
/// - `submit_sender`: A channel sender used to submit built blocks.
async fn run_simulator(
self: Arc<Self>,
self,
constants: SignetSystemConstants,
cache: SimCache,
submit_sender: mpsc::UnboundedSender<BuiltBlock>,
Expand Down
117 changes: 117 additions & 0 deletions src/tasks/env.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use crate::config::{BuilderConfig, RuProvider};
use alloy::{
consensus::Header,
eips::eip1559::BaseFeeParams,
primitives::{B256, U256},
providers::Provider,
};
use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span};
use std::time::Duration;
use tokio::sync::watch;
use tokio_stream::StreamExt;
use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice};

/// A task that constructs a BlockEnv for the next block in the rollup chain.
#[derive(Debug, Clone)]
pub struct EnvTask {
config: BuilderConfig,
provider: RuProvider,
}

impl EnvTask {
/// Create a new EnvTask with the given config and provider.
pub const fn new(config: BuilderConfig, provider: RuProvider) -> Self {
Self { config, provider }
}

/// Construct a BlockEnv by making calls to the provider.
pub fn construct_block_env(&self, previous: &Header) -> BlockEnv {
BlockEnv {
number: previous.number + 1,
beneficiary: self.config.builder_rewards_address,
// NB: EXACTLY the same as the previous block
timestamp: previous.number + self.config.slot_calculator.slot_duration(),
gas_limit: self.config.rollup_block_gas_limit,
basefee: previous
.next_block_base_fee(BaseFeeParams::ethereum())
.expect("signet has no non-1559 headers"),
difficulty: U256::ZERO,
prevrandao: Some(B256::random()),
blob_excess_gas_and_price: Some(BlobExcessGasAndPrice {
excess_blob_gas: 0,
blob_gasprice: 0,
}),
}
}

/// Construct the BlockEnv and send it to the sender.
pub async fn task_fut(self, sender: watch::Sender<Option<BlockEnv>>) {
let span = info_span!("EnvTask::task_fut::init");
let mut poller = match self.provider.watch_blocks().instrument(span.clone()).await {
Ok(poller) => poller,
Err(err) => {
let _span = span.enter();
error!(%err, "Failed to watch blocks");
return;
}
};

poller.set_poll_interval(Duration::from_millis(250));

let mut blocks = poller.into_stream();

while let Some(blocks) =
blocks.next().instrument(info_span!("EnvTask::task_fut::stream")).await
{
let Some(block) = blocks.last() else {
// This case occurs when there are no changes to the block,
// so we do nothing.
debug!("empty filter changes");
continue;
};
let span = info_span!("EnvTask::task_fut::loop", hash = %block, number = tracing::field::Empty);

let previous = match self
.provider
.get_block((*block).into())
.into_future()
.instrument(span.clone())
.await
{
Ok(Some(block)) => block.header.inner,
Ok(None) => {
let _span = span.enter();
let _ = sender.send(None);
debug!("block not found");
// This may mean the chain had a rollback, so the next poll
// should find something.
continue;
}
Err(err) => {
let _span = span.enter();
let _ = sender.send(None);
error!(%err, "Failed to get latest block");
// Error may be transient, so we should not break the loop.
continue;
}
};
span.record("number", previous.number);

let env = self.construct_block_env(&previous);
debug!(?env, "constructed block env");
if sender.send(Some(env)).is_err() {
// The receiver has been dropped, so we can stop the task.
break;
}
}
}

/// Spawn the task and return a watch::Receiver for the BlockEnv.
pub fn spawn(self) -> watch::Receiver<Option<BlockEnv>> {
let (sender, receiver) = watch::channel(None);
let fut = self.task_fut(sender);
tokio::spawn(fut);

receiver
}
}
3 changes: 3 additions & 0 deletions src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ pub mod tx_poller;

/// Block simulation and environment
pub mod block;

/// Constructs the simualtion environment.
pub mod env;
11 changes: 4 additions & 7 deletions tests/block_builder_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ mod tests {
use init4_bin_base::utils::calc::SlotCalculator;
use signet_sim::{SimCache, SimItem};
use signet_types::constants::SignetSystemConstants;
use std::{
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::{sync::mpsc::unbounded_channel, time::timeout};

/// Tests the `handle_build` method of the `Simulator`.
Expand Down Expand Up @@ -108,16 +105,16 @@ mod tests {
// Create a rollup provider
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());

let sim = Arc::new(Simulator::new(&config, ru_provider.clone(), config.slot_calculator));
let sim = Simulator::new(&config, ru_provider.clone(), config.slot_calculator);

// Create a shared sim cache
let sim_cache = SimCache::new();

// Create a sim cache and start filling it with items
sim.clone().spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone());
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone());

// Finally, Kick off the block builder task.
sim.clone().spawn_simulator_task(constants, sim_cache.clone(), block_sender);
sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender);

// Feed in transactions to the tx_sender and wait for the block to be simulated
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
Expand Down