Skip to content

Commit 946e7d2

Browse files
committed
adds simulation factory to builder
- adds initial implementation of simulator task to the builder
1 parent d8681d0 commit 946e7d2

File tree

6 files changed

+447
-0
lines changed

6 files changed

+447
-0
lines changed

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ alloy = { version = "0.12.6", features = [
3838
"serde",
3939
] }
4040

41+
trevm = { version = "0.19.12", features = [ "concurrent-db" ]}
42+
revm = { version = "19.5.0", features = [ "alloydb" ]}
43+
44+
# HACK: Update these to use main alloy package
45+
alloy-provider = { version = "0.7.3" }
46+
alloy-eips = { version = "0.7.3" }
47+
4148
aws-config = "1.1.7"
4249
aws-sdk-kms = "1.15.0"
4350

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,7 @@ pub mod tasks;
2727
/// Utilities.
2828
pub mod utils;
2929

30+
use alloy_eips as _;
31+
use alloy_provider as _;
3032
/// Anonymous crate dependency imports.
3133
use openssl as _;

src/tasks/block.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ impl InProgressBlock {
4343
self.transactions.is_empty()
4444
}
4545

46+
/// Returns the current list of transactions included in this block
47+
pub fn transactions(&self) -> Vec<TxEnvelope> {
48+
self.transactions.clone()
49+
}
50+
4651
/// Unseal the block
4752
fn unseal(&mut self) {
4853
self.raw_encoding.take();

src/tasks/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,6 @@ pub mod submit;
1515

1616
/// Tx polling task
1717
pub mod tx_poller;
18+
19+
/// Tx and bundle simulation task
20+
pub mod simulator;

src/tasks/simulator.rs

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
use crate::tasks::block::InProgressBlock;
2+
use alloy::consensus::TxEnvelope;
3+
use alloy::primitives::U256;
4+
use eyre::Result;
5+
use revm::{db::CacheDB, primitives::{address, Account, CfgEnv, ExecutionResult}, DatabaseRef};
6+
use std::{convert::Infallible, sync::Arc};
7+
use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet};
8+
use trevm::{
9+
db::sync::{ConcurrentState, ConcurrentStateInfo},
10+
revm::{
11+
primitives::{EVMError, ResultAndState},
12+
Database, DatabaseCommit, EvmBuilder,
13+
},
14+
BlockDriver, Cfg, DbConnect, EvmFactory, NoopBlock, TrevmBuilder, Tx,
15+
};
16+
17+
/// Tracks the EVM state, score, and result of an EVM execution.
18+
/// Scores are assigned by the evaluation function, and are Ord
19+
/// or PartialOrd to allow for sorting.
20+
#[derive(Debug, Clone)]
21+
pub struct Best<T, S: PartialOrd + Ord = U256> {
22+
/// The transaction being executed.
23+
pub tx: Arc<T>,
24+
/// The result and state of the execution.
25+
pub result: ResultAndState,
26+
/// The score calculated by the evaluation function.
27+
pub score: S,
28+
}
29+
30+
/// Binds a database and an extension together.
31+
#[derive(Debug, Clone)]
32+
pub struct SimulatorFactory<Db, Ext> {
33+
/// The database state the execution is carried out on.
34+
pub db: Db,
35+
/// The extension, if any, provided to the trevm instance.
36+
pub ext: Ext,
37+
}
38+
39+
/// SimResult is an [`Option`] type that holds a tuple of a transaction and its associated
40+
/// state as a [`Db`] type updates if it was successfully executed.
41+
type SimResult<Db> = Option<(Best<TxEnvelope>, ConcurrentState<Arc<ConcurrentState<Db>>>)>;
42+
43+
impl<Db, Ext> SimulatorFactory<Db, Ext>
44+
where
45+
Ext: Send + Sync + Clone + 'static,
46+
Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
47+
{
48+
/// Creates a new Simulator factory out of the database and extension.
49+
pub const fn new(db: Db, ext: Ext) -> Self {
50+
Self { db, ext }
51+
}
52+
53+
/// Spawns a trevm simulator that runs until `deadline` is hit.
54+
/// * Spawn does not guarantee that a thread is finished before the deadline.
55+
/// * This is intentional, so that it can maximize simulation time before the deadline.
56+
/// * This function always returns whatever the latest finished in progress block is.
57+
pub fn spawn<T, F>(
58+
self,
59+
mut inbound_tx: UnboundedReceiver<TxEnvelope>,
60+
evaluator: Arc<F>,
61+
deadline: tokio::time::Instant,
62+
) -> tokio::task::JoinHandle<InProgressBlock>
63+
where
64+
T: Tx,
65+
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
66+
{
67+
tokio::spawn(async move {
68+
// Spawn a join set to track all simulation threads
69+
let mut join_set = JoinSet::new();
70+
71+
let mut best: Option<Best<TxEnvelope>> = None;
72+
73+
let mut block = InProgressBlock::new();
74+
75+
let sleep = tokio::time::sleep_until(deadline);
76+
tokio::pin!(sleep);
77+
78+
loop {
79+
select! {
80+
_ = &mut sleep => break,
81+
// Handle incoming
82+
tx = inbound_tx.recv() => {
83+
if let Some(inbound_tx) = tx {
84+
// Setup the simulation environment
85+
let sim = self.clone();
86+
let eval = evaluator.clone();
87+
let mut parent_db = Arc::new(sim.connect().unwrap());
88+
89+
// Kick off the work in a new thread
90+
join_set.spawn(async move {
91+
let result = sim.simulate_tx(inbound_tx, eval, parent_db.child());
92+
93+
if let Some((best, db)) = result {
94+
if let Ok(()) = parent_db.merge_child(db) {
95+
tracing::debug!("merging updated simulation state");
96+
return Some(best)
97+
}
98+
tracing::error!("failed to update simulation state");
99+
None
100+
} else {
101+
None
102+
}
103+
});
104+
}
105+
}
106+
Some(result) = join_set.join_next() => {
107+
match result {
108+
Ok(Some(candidate)) => {
109+
tracing::info!(tx_hash = ?candidate.tx.tx_hash(), "ingesting transaction");
110+
block.ingest_tx(candidate.tx.as_ref());
111+
112+
if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() {
113+
tracing::info!(score = ?candidate.score, "new best candidate found");
114+
best = Some(candidate);
115+
}
116+
}
117+
Ok(None) => {
118+
tracing::debug!("simulation returned no result");
119+
}
120+
Err(e) => {
121+
tracing::error!("simulation task failed: {}", e);
122+
}
123+
}
124+
}
125+
else => break,
126+
}
127+
}
128+
129+
block
130+
})
131+
}
132+
133+
/// Simulates an inbound tx and applies its state if it's successfully simualted
134+
pub fn simulate_tx<F>(
135+
self,
136+
tx: TxEnvelope,
137+
evaluator: Arc<F>,
138+
db: ConcurrentState<Arc<ConcurrentState<Db>>>,
139+
) -> SimResult<Db>
140+
where
141+
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
142+
Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static,
143+
{
144+
let trevm_instance = EvmBuilder::default().with_db(db).build_trevm();
145+
146+
let result = trevm_instance
147+
.fill_cfg(&PecorinoCfg)
148+
.fill_block(&NoopBlock)
149+
.fill_tx(&tx) // Use as_ref() to get &SimTxEnvelope from Arc
150+
.run();
151+
152+
match result {
153+
Ok(t) => {
154+
// log and evaluate simulation results
155+
tracing::info!(tx_hash = ?tx.clone().tx_hash(), "transaction simulated");
156+
let result = t.result_and_state().clone();
157+
tracing::debug!(gas_used = &result.result.gas_used(), "gas consumed");
158+
let score = evaluator(&result);
159+
tracing::debug!(score = ?score, "transaction evaluated");
160+
161+
// accept results
162+
let t = t.accept();
163+
let db = t.1.into_db();
164+
165+
// return the updated db with the candidate applied to its state
166+
Some((Best { tx: Arc::new(tx), result, score }, db))
167+
}
168+
Err(e) => {
169+
// if this transaction fails to run, log the error and return None
170+
tracing::error!(err = ?e.as_transaction_error(), "failed to simulate tx");
171+
None
172+
}
173+
}
174+
}
175+
176+
/// Simulates an inbound bundle and applies its state if it's successfully simulated
177+
pub fn simulate_bundle<T, F>(
178+
&self,
179+
_bundle: Arc<Vec<T>>,
180+
_evaluator: Arc<F>,
181+
_trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState<CacheDB<Arc<Db>>>>,
182+
) -> Option<Best<Vec<T>>>
183+
where
184+
T: Tx + Send + Sync + 'static,
185+
F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static,
186+
{
187+
todo!("implement bundle handling")
188+
}
189+
}
190+
191+
/// Wraps a Db into an EvmFactory compatible [`Database`]
192+
impl<'a, Db, Ext> DbConnect<'a> for SimulatorFactory<Db, Ext>
193+
where
194+
Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static,
195+
Ext: Sync + Clone,
196+
{
197+
type Database = ConcurrentState<Db>;
198+
type Error = Infallible;
199+
200+
fn connect(&'a self) -> Result<Self::Database, Self::Error> {
201+
let inner = ConcurrentState::new(self.db.clone(), ConcurrentStateInfo::default());
202+
Ok(inner)
203+
}
204+
}
205+
206+
/// Makes a SimulatorFactory capable of creating and configuring trevm instances
207+
impl<'a, Db, Ext> EvmFactory<'a> for SimulatorFactory<Db, Ext>
208+
where
209+
Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static,
210+
Ext: Sync + Clone,
211+
{
212+
type Ext = ();
213+
214+
/// Create makes a [`ConcurrentState`] database by calling connect
215+
fn create(&'a self) -> Result<trevm::EvmNeedsCfg<'a, Self::Ext, Self::Database>, Self::Error> {
216+
let db = self.connect()?;
217+
let trevm = trevm::revm::EvmBuilder::default().with_db(db).build_trevm();
218+
Ok(trevm)
219+
}
220+
}
221+
222+
/// A trait for extracting transactions from
223+
pub trait BlockExtractor<Ext, Db: Database + DatabaseCommit>: Send + Sync + 'static {
224+
/// BlockDriver runs the transactions over the provided trevm instance.
225+
type Driver: BlockDriver<Ext, Error<Db>: core::error::Error>;
226+
227+
/// Instantiate an configure a new [`trevm`] instance.
228+
fn trevm(&self, db: Db) -> trevm::EvmNeedsBlock<'static, Ext, Db>;
229+
230+
/// Extracts transactions from the source.
231+
///
232+
/// Extraction is infallible. Worst case it should return a no-op driver.
233+
fn extract(&mut self, bytes: &[u8]) -> Self::Driver;
234+
}
235+
236+
impl<Ext> BlockDriver<Ext> for InProgressBlock {
237+
type Block = NoopBlock;
238+
239+
type Error<Db: Database + DatabaseCommit> = Error<Db>;
240+
241+
fn block(&self) -> &Self::Block {
242+
&NoopBlock
243+
}
244+
245+
/// Loops through the transactions in the block and runs them, accepting the state at the end
246+
/// if it was successful and returning and erroring out otherwise.
247+
fn run_txns<'a, Db: Database + DatabaseCommit>(
248+
&mut self,
249+
mut trevm: trevm::EvmNeedsTx<'a, Ext, Db>,
250+
) -> trevm::RunTxResult<'a, Ext, Db, Self> {
251+
for tx in self.transactions().iter() {
252+
if tx.recover_signer().is_ok() {
253+
let sender = tx.recover_signer().unwrap();
254+
tracing::info!(sender = ?sender, tx_hash = ?tx.tx_hash(), "simulating transaction");
255+
256+
let t = match trevm.run_tx(tx) {
257+
Ok(t) => t,
258+
Err(e) => {
259+
if e.is_transaction_error() {
260+
return Ok(e.discard_error());
261+
} else {
262+
return Err(e.err_into());
263+
}
264+
}
265+
};
266+
267+
(_, trevm) = t.accept();
268+
}
269+
}
270+
Ok(trevm)
271+
}
272+
273+
fn post_block<Db: Database + DatabaseCommit>(
274+
&mut self,
275+
_trevm: &trevm::EvmNeedsBlock<'_, Ext, Db>,
276+
) -> Result<(), Self::Error<Db>> {
277+
Ok(())
278+
}
279+
}
280+
281+
/// Defines the CfgEnv for Pecorino Network
282+
#[derive(Debug, Clone, Copy)]
283+
pub struct PecorinoCfg;
284+
285+
impl Cfg for PecorinoCfg {
286+
fn fill_cfg_env(&self, cfg_env: &mut CfgEnv) {
287+
cfg_env.chain_id = 17003;
288+
}
289+
}
290+
291+
/// Wrap the EVM error in a database error type
292+
pub struct Error<Db: Database>(EVMError<Db::Error>);
293+
294+
impl<Db> From<EVMError<Db::Error>> for Error<Db>
295+
where
296+
Db: Database,
297+
{
298+
fn from(e: EVMError<Db::Error>) -> Self {
299+
Self(e)
300+
}
301+
}
302+
303+
impl<Db: Database> core::error::Error for Error<Db> {}
304+
305+
impl<Db: Database> core::fmt::Debug for Error<Db> {
306+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
307+
write!(f, "Error")
308+
}
309+
}
310+
311+
impl<Db: Database> core::fmt::Display for Error<Db> {
312+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
313+
write!(f, "Error")
314+
}
315+
}
316+
317+
/// A simple evaluation function as a sane default.
318+
pub fn eval_fn(state: &ResultAndState) -> U256 {
319+
// log the transaction results
320+
match &state.result {
321+
ExecutionResult::Success { .. } => println!("Execution was successful."),
322+
ExecutionResult::Revert { .. } => println!("Execution reverted."),
323+
ExecutionResult::Halt { .. } => println!("Execution halted."),
324+
}
325+
326+
// return the target account balance
327+
let target_addr = address!("0x0000000000000000000000000000000000000000");
328+
let default_account = Account::default();
329+
let target_account = state.state.get(&target_addr).unwrap_or(&default_account);
330+
tracing::info!(balance = ?target_account.info.balance, "target account balance");
331+
332+
target_account.info.balance
333+
}

0 commit comments

Comments
 (0)