diff --git a/Cargo.toml b/Cargo.toml index b9231f6..68ef17f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "mai signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +trevm = { version = "0.20.4", features = [ "concurrent-db", "test-utils" ]} + alloy = { version = "0.12.6", features = [ "full", "json-rpc", diff --git a/src/tasks/block.rs b/src/tasks/block.rs index a0c9c87..4e26d90 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -43,6 +43,11 @@ impl InProgressBlock { self.transactions.is_empty() } + /// Returns the current list of transactions included in this block + pub fn transactions(&self) -> Vec { + self.transactions.clone() + } + /// Unseal the block fn unseal(&mut self) { self.raw_encoding.take(); diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index d3ec34e..d1b1220 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -4,11 +4,11 @@ use crate::tasks::oauth::Authenticator; use oauth2::TokenResponse; use reqwest::Url; use serde::{Deserialize, Serialize}; +use signet_bundle::SignetEthBundle; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; use tokio::task::JoinHandle; use tokio::time; use tracing::{Instrument, debug, trace}; -use signet_bundle::SignetEthBundle; /// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index e6983af..9e8cd00 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -15,3 +15,6 @@ pub mod submit; /// Tx polling task pub mod tx_poller; + +/// Tx and bundle simulation task +pub mod simulator; diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs new file mode 100644 index 0000000..2e485e7 --- /dev/null +++ b/src/tasks/simulator.rs @@ -0,0 +1,277 @@ +use crate::tasks::block::InProgressBlock; +use alloy::consensus::TxEnvelope; +use alloy::primitives::U256; +use eyre::Result; +use std::sync::Arc; +use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; +use trevm::{ + Cfg, DbConnect, NoopBlock, TrevmBuilder, TrevmBuilderError, Tx, + db::{ + cow::CacheOnWrite, + sync::{ConcurrentState, ConcurrentStateInfo}, + }, + helpers::Ctx, + revm::{ + Database, DatabaseCommit, DatabaseRef, Inspector, + context::{ + CfgEnv, + result::{EVMError, ExecutionResult, ResultAndState}, + }, + primitives::address, + state::Account, + }, +}; + +/// Tracks the EVM state, score, and result of an EVM execution. +/// Scores are assigned by the evaluation function, and are Ord +/// or PartialOrd to allow for sorting. +#[derive(Debug, Clone)] +pub struct Best { + /// The transaction being executed. + pub tx: Arc, + /// The result and state of the execution. + pub result: ResultAndState, + /// The score calculated by the evaluation function. + pub score: S, +} + +/// Binds a database and an inspector together for simulation. +#[derive(Debug, Clone)] +pub struct SimulatorFactory { + /// The inspector + pub inspector: Insp, + /// A CacheOnWrite that is cloneable + pub cow: MakeCow, +} + +/// SimResult is an [`Option`] type that holds a tuple of a transaction and its associated +/// state as a [`Db`] type updates if it was successfully executed. +type SimResult = Result, CacheOnWrite>>)>>; + +impl SimulatorFactory +where + Insp: Inspector>>>>> + + Send + + Sync + + Clone + + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static, + MakeCow: DbConnect>>>, +{ + /// Creates a new Simulator factory from the provided database and inspector. + pub fn new(db: Db, inspector: Insp) -> Self { + let cdb = ConcurrentState::new(db, ConcurrentStateInfo::default()); + let cdb = Arc::new(cdb); + let cow = MakeCow::new(cdb); + + Self { inspector, cow } + } + + /// Spawns a trevm simulator that runs until `deadline` is hit. + /// * Spawn does not guarantee that a thread is finished before the deadline. + /// * This is intentional, so that it can maximize simulation time before the deadline. + /// * This function always returns whatever the latest finished in progress block is. + pub fn spawn( + self, + mut inbound_tx: UnboundedReceiver, + evaluator: Arc, + deadline: tokio::time::Instant, + ) -> tokio::task::JoinHandle + where + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + { + tokio::spawn(async move { + let mut join_set = JoinSet::new(); + let mut block = InProgressBlock::new(); + + let sleep = tokio::time::sleep_until(deadline); + tokio::pin!(sleep); + + loop { + select! { + _ = &mut sleep => { + tracing::debug!("deadline reached, stopping simulation"); + break; + }, + tx = inbound_tx.recv() => { + tracing::debug!(tx = ?tx, "received transaction"); + if let Some(inbound_tx) = tx { + let eval = evaluator.clone(); + let sim = self.clone(); + let db = self.cow.connect().unwrap(); + + join_set.spawn(async move { + let result = sim.simulate_tx(inbound_tx, eval, db.nest()); + match result { + Ok(Some((best, new_db))) => { + tracing::debug!("simulation completed, attempting to update state"); + // TODO: call cow.flatten on the nest instead + tracing::debug!("successfully merged simulation state"); + return Some(best); + } + Ok(None) => { + tracing::debug!("simulation returned no result"); + return None; + } + Err(e) => { + tracing::error!(e = ?e, "failed to simulate transaction"); + return None; + } + } + }); + } + } + Some(result) = join_set.join_next() => { + println!("join_set result"); + match result { + Ok(Some(best)) => { + println!("simulation completed"); + block.ingest_tx(best.tx.as_ref()); + }, + Ok(None) => { + println!("simulation returned no result"); + tracing::debug!("simulation returned no result"); + } + Err(e) => { + println!("simulation returned an error: {}", e); + tracing::error!(e = ?e, "failed to simulate transaction"); + } + } + } + } + } + + block + }) + } + + /// Simulates an inbound tx and applies its state if it's successfully simualted + pub fn simulate_tx( + &self, + tx: TxEnvelope, + evaluator: Arc, + db: CacheOnWrite>>>, + ) -> SimResult + where + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static, + { + let t = TrevmBuilder::new().with_db(db).with_insp(self.inspector.clone()).build_trevm()?; + + let result = t.fill_cfg(&PecorinoCfg).fill_block(&NoopBlock).fill_tx(&tx).run(); + + match result { + Ok(t) => { + let result = t.result_and_state().clone(); + let db = t.into_db(); + let score = evaluator(&result); + let best = Best { tx: Arc::new(tx), result, score }; + + // Flatten to save the result to the parent and return it + let db = db.flatten(); + + Ok(Some((best, db))) + } + Err(terr) => { + tracing::error!(err = ?terr.error(), "transaction simulation error"); + Ok(None) + } + } + } + + /// Simulates an inbound bundle and applies its state if it's successfully simulated + pub fn simulate_bundle( + &self, + _bundle: Arc>, + _evaluator: Arc, + _db: ConcurrentState>>, + ) -> Option>> + where + T: Tx + Send + Sync + 'static, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + { + todo!("implement bundle handling") + } +} + +/// MakeCow wraps a ConcurrentState database in an Arc to allow for cloning. +#[derive(Debug, Clone)] +pub struct MakeCow(Arc>); + +impl MakeCow +where + Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + 'static, +{ + /// Returns a new CoW Db that implements Clone for use in DbConnect + pub fn new(db: Arc>) -> Self { + Self(db) + } +} + +impl DbConnect for MakeCow +where + Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static, +{ + type Database = CacheOnWrite>>; + type Error = TrevmBuilderError; + + /// Connects to the database and returns a CacheOnWrite instance + fn connect(&self) -> Result { + let db: CacheOnWrite>> = CacheOnWrite::new(self.0.clone()); + Ok(db) + } +} + +/// Defines the CfgEnv for Pecorino Network +#[derive(Debug, Clone, Copy)] +pub struct PecorinoCfg; + +impl Cfg for PecorinoCfg { + fn fill_cfg_env(&self, cfg_env: &mut CfgEnv) { + cfg_env.chain_id = 17003; + } +} + +/// Wrap the EVM error in a database error type +pub struct Error(EVMError); + +impl From> for Error +where + Db: Database, +{ + fn from(e: EVMError) -> Self { + Self(e) + } +} + +impl core::error::Error for Error {} + +impl core::fmt::Debug for Error { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "Error") + } +} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "Error") + } +} + +/// A simple evaluation function as a sane default. +pub fn eval_fn(state: &ResultAndState) -> U256 { + // log the transaction results + match &state.result { + ExecutionResult::Success { .. } => println!("Execution was successful."), + ExecutionResult::Revert { .. } => println!("Execution reverted."), + ExecutionResult::Halt { .. } => println!("Execution halted."), + } + + // return the target account balance + let target_addr = address!("0x0000000000000000000000000000000000000000"); + let default_account = Account::default(); + let target_account = state.state.get(&target_addr).unwrap_or(&default_account); + tracing::info!(balance = ?target_account.info.balance, "target account balance"); + + target_account.info.balance +} diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs new file mode 100644 index 0000000..41d5a42 --- /dev/null +++ b/tests/simulator_test.rs @@ -0,0 +1,102 @@ +use alloy::{ + consensus::{SignableTransaction as _, TxEip1559, TxEnvelope}, + primitives::U256, + signers::{ + SignerSync as _, + local::{LocalSigner, PrivateKeySigner}, + }, +}; +use builder::tasks::simulator::SimulatorFactory; +use std::sync::Arc; +use tokio::{ + sync::mpsc, + time::{Duration, Instant}, +}; +use trevm::revm::{ + context::result::{ExecutionResult, ResultAndState}, + database::{CacheDB, InMemoryDB}, + inspector::NoOpInspector, + primitives::{TxKind, address}, + state::{Account, AccountInfo}, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_spawn() { + // Setup transaction pipeline plumbing + let unbounded_channel = mpsc::unbounded_channel::(); + let (tx_sender, tx_receiver) = unbounded_channel; + let (_bundle_sender, _bundle_receiver) = mpsc::unbounded_channel::>(); + let deadline = Instant::now() + Duration::from_secs(5); + + // Create a new anvil instance and test wallets + let anvil = + alloy::node_bindings::Anvil::new().block_time(1).chain_id(14174).try_spawn().unwrap(); + let keys = anvil.keys(); + let test_wallet = &PrivateKeySigner::from(keys[0].clone()); + + // Create a evaluator + let evaluator = Arc::new(test_evaluator); + + // Make a database and seed it with some starting account state + let db = seed_database(CacheDB::new(InMemoryDB::default()), test_wallet); + + // Create a new simulator factory with the given database and inspector + let sim_factory = SimulatorFactory::new(db, NoOpInspector); + + // Spawn the simulator actor + let handle = sim_factory.spawn(tx_receiver, evaluator, deadline); + + // Send transactions to the simulator + for count in 0..2 { + let test_tx = new_test_tx(test_wallet, count).unwrap(); + tx_sender.send(test_tx).unwrap(); + } + + // Wait for simulation to complete + let best = handle.await.unwrap(); + + // Assert on the block + assert_eq!(best.len(), 1); +} + +/// An example of a simple evaluator function for use in testing +fn test_evaluator(state: &ResultAndState) -> U256 { + // log the transaction results + match &state.result { + ExecutionResult::Success { .. } => println!("Execution was successful."), + ExecutionResult::Revert { .. } => println!("Execution reverted."), + ExecutionResult::Halt { .. } => println!("Execution halted."), + } + + // return the target account balance + let target_addr = address!("0x0000000000000000000000000000000000000000"); + let default_account = Account::default(); + let target_account = state.state.get(&target_addr).unwrap_or(&default_account); + tracing::info!(balance = ?target_account.info.balance, "target account balance"); + + target_account.info.balance +} + +// Returns a new signed test transaction with default values +fn new_test_tx(wallet: &PrivateKeySigner, nonce: u64) -> eyre::Result { + let tx = TxEip1559 { + chain_id: 17003, + gas_limit: 50000, + nonce, + to: TxKind::Call(address!("0x0000000000000000000000000000000000000000")), + value: U256::from(1), + input: alloy::primitives::bytes!(""), + ..Default::default() + }; + let signature = wallet.sign_hash_sync(&tx.signature_hash())?; + Ok(TxEnvelope::Eip1559(tx.into_signed(signature))) +} + +// Adds a balance to the given wallet address in the database for simple simulation unit tests +fn seed_database(mut db: CacheDB, wallet: &PrivateKeySigner) -> CacheDB { + let mut info = AccountInfo::default(); + info.balance = U256::from(10000); + db.insert_account_info(wallet.address(), info); + + db +}