1
1
use crate :: {
2
2
config:: { BuilderConfig , WalletlessProvider } ,
3
- tasks:: {
4
- bundler:: { Bundle , BundlePoller } ,
5
- oauth:: Authenticator ,
6
- tx_poller:: TxPoller ,
7
- } ,
3
+ tasks:: bundler:: Bundle ,
8
4
} ;
9
5
use alloy:: { consensus:: TxEnvelope , eips:: BlockId , providers:: Provider } ;
10
6
use signet_sim:: { BlockBuild , BuiltBlock , SimCache , SimItem } ;
11
- use signet_types:: config:: SignetSystemConstants ;
7
+ use signet_types:: { SlotCalculator , config:: SignetSystemConstants } ;
8
+ use std:: {
9
+ sync:: Arc ,
10
+ time:: { Duration , Instant , SystemTime , UNIX_EPOCH } ,
11
+ } ;
12
12
use tokio:: {
13
13
select,
14
14
sync:: mpsc:: { self } ,
15
+ task:: JoinHandle ,
15
16
} ;
16
17
use trevm:: {
17
18
NoopBlock ,
@@ -23,9 +24,6 @@ use trevm::{
23
24
} ,
24
25
} ;
25
26
26
- /// Ethereum's slot time in seconds.
27
- pub const ETHEREUM_SLOT_TIME : u64 = 12 ;
28
-
29
27
/// Pecorino Chain ID
30
28
pub const PECORINO_CHAIN_ID : u64 = 14174 ;
31
29
@@ -37,42 +35,28 @@ pub struct BlockBuilder {
37
35
pub config : BuilderConfig ,
38
36
/// A provider that cannot sign transactions.
39
37
pub ru_provider : WalletlessProvider ,
40
- /// A poller for fetching transactions.
41
- pub tx_poller : TxPoller ,
42
- /// A poller for fetching bundles.
43
- pub bundle_poller : BundlePoller ,
38
+ /// The slot calculator for waking up and sleeping the builder correctly
39
+ pub slot_calculator : SlotCalculator ,
44
40
}
45
41
46
42
impl BlockBuilder {
47
- /// Create a new block builder with the given config .
43
+ /// Creates a new block builder that builds blocks based on the given provider .
48
44
pub fn new (
49
45
config : & BuilderConfig ,
50
- authenticator : Authenticator ,
51
46
ru_provider : WalletlessProvider ,
47
+ slot_calculator : SlotCalculator ,
52
48
) -> Self {
53
- Self {
54
- config : config. clone ( ) ,
55
- ru_provider,
56
- tx_poller : TxPoller :: new ( config) ,
57
- bundle_poller : BundlePoller :: new ( config, authenticator) ,
58
- }
49
+ Self { config : config. clone ( ) , ru_provider, slot_calculator }
59
50
}
60
51
61
- /// Spawn the block builder task, returning the inbound channel to it, and
62
- /// a handle to the running task.
52
+ /// Handles building a single block.
63
53
pub async fn handle_build (
64
54
& self ,
65
55
constants : SignetSystemConstants ,
66
- ru_provider : WalletlessProvider ,
67
56
sim_items : SimCache ,
57
+ finish_by : Instant ,
68
58
) -> Result < BuiltBlock , mpsc:: error:: SendError < BuiltBlock > > {
69
- let db = create_db ( ru_provider) . await . unwrap ( ) ;
70
-
71
- // TODO: add real slot calculator
72
- let finish_by = std:: time:: Instant :: now ( ) + std:: time:: Duration :: from_millis ( 200 ) ;
73
-
74
- dbg ! ( sim_items. read_best( 2 ) ) ;
75
- dbg ! ( sim_items. len( ) ) ;
59
+ let db = self . create_db ( ) . await . unwrap ( ) ;
76
60
77
61
let block_build: BlockBuild < _ , NoOpInspector > = BlockBuild :: new (
78
62
db,
@@ -90,47 +74,85 @@ impl BlockBuilder {
90
74
Ok ( block)
91
75
}
92
76
93
- /// Spawns a task that receives transactions and bundles from the pollers and
94
- /// adds them to the shared cache.
95
- pub async fn spawn_cache_task (
96
- & self ,
77
+ /// Scans the tx and bundle receivers for new items and adds them to the cache.
78
+ pub fn spawn_cache_handler (
79
+ self : Arc < Self > ,
97
80
mut tx_receiver : mpsc:: UnboundedReceiver < TxEnvelope > ,
98
81
mut bundle_receiver : mpsc:: UnboundedReceiver < Bundle > ,
99
82
cache : SimCache ,
100
- ) {
101
- loop {
102
- select ! {
103
- maybe_tx = tx_receiver. recv( ) => {
104
- if let Some ( tx) = maybe_tx {
105
- cache. add_item( SimItem :: Tx ( tx) ) ;
83
+ ) -> JoinHandle < ( ) > {
84
+ let jh = tokio:: spawn ( async move {
85
+ loop {
86
+ select ! {
87
+ maybe_tx = tx_receiver. recv( ) => {
88
+ if let Some ( tx) = maybe_tx {
89
+ cache. add_item( SimItem :: Tx ( tx) ) ;
90
+ }
91
+ }
92
+ maybe_bundle = bundle_receiver. recv( ) => {
93
+ if let Some ( bundle) = maybe_bundle {
94
+ cache. add_item( SimItem :: Bundle ( bundle. bundle) ) ;
95
+ }
106
96
}
107
97
}
108
- maybe_bundle = bundle_receiver. recv( ) => {
109
- if let Some ( bundle) = maybe_bundle {
110
- cache. add_item( SimItem :: Bundle ( bundle. bundle) ) ;
98
+ }
99
+ } ) ;
100
+ jh
101
+ }
102
+
103
+ /// Spawns the block building task.
104
+ pub fn spawn_builder_task (
105
+ self : Arc < Self > ,
106
+ constants : SignetSystemConstants ,
107
+ cache : SimCache ,
108
+ submit_sender : mpsc:: UnboundedSender < BuiltBlock > ,
109
+ ) -> JoinHandle < ( ) > {
110
+ let jh = tokio:: spawn ( async move {
111
+ loop {
112
+ let sim_cache = cache. clone ( ) ;
113
+
114
+ let finish_by = self . calculate_deadline ( ) ;
115
+ tracing:: info!( "simulating until target slot deadline" ) ;
116
+
117
+ // sleep until next wake period
118
+ tracing:: info!( "starting block build" ) ;
119
+ match self . handle_build ( constants, sim_cache, finish_by) . await {
120
+ Ok ( block) => {
121
+ let _ = submit_sender. send ( block) ;
122
+ }
123
+ Err ( e) => {
124
+ tracing:: error!( err = %e, "failed to send block" ) ;
125
+ continue ;
111
126
}
112
127
}
113
128
}
114
- }
129
+ } ) ;
130
+ jh
115
131
}
116
- }
117
132
118
- /// Creates an AlloyDB from a rollup provider
119
- async fn create_db ( ru_provider : WalletlessProvider ) -> Option < WrapAlloyDatabaseAsync > {
120
- let latest = match ru_provider. get_block_number ( ) . await {
121
- Ok ( block_number) => block_number,
122
- Err ( e) => {
123
- tracing:: error!( error = %e, "failed to get latest block number" ) ;
124
- println ! ( "failed to get latest block number" ) ;
125
- // Should this do anything else?
126
- return None ;
127
- }
128
- } ;
129
- let alloy_db = AlloyDB :: new ( ru_provider. clone ( ) , BlockId :: from ( latest) ) ;
130
- let wrapped_db = WrapDatabaseAsync :: new ( alloy_db) . unwrap_or_else ( || {
131
- panic ! ( "failed to acquire async alloy_db; check which runtime you're using" )
132
- } ) ;
133
- Some ( wrapped_db)
133
+ /// Returns the instant at which simulation must stop.
134
+ pub fn calculate_deadline ( & self ) -> Instant {
135
+ let now = SystemTime :: now ( ) ;
136
+ let unix_seconds = now. duration_since ( UNIX_EPOCH ) . expect ( "Time went backwards" ) . as_secs ( ) ;
137
+
138
+ Instant :: now ( ) . checked_add ( Duration :: from_secs ( unix_seconds) ) . unwrap ( )
139
+ }
140
+
141
+ /// Creates an AlloyDB from a rollup provider
142
+ async fn create_db ( & self ) -> Option < WrapAlloyDatabaseAsync > {
143
+ let latest = match self . ru_provider . get_block_number ( ) . await {
144
+ Ok ( block_number) => block_number,
145
+ Err ( e) => {
146
+ tracing:: error!( error = %e, "failed to get latest block number" ) ;
147
+ return None ;
148
+ }
149
+ } ;
150
+ let alloy_db = AlloyDB :: new ( self . ru_provider . clone ( ) , BlockId :: from ( latest) ) ;
151
+ let wrapped_db = WrapDatabaseAsync :: new ( alloy_db) . unwrap_or_else ( || {
152
+ panic ! ( "failed to acquire async alloy_db; check which runtime you're using" )
153
+ } ) ;
154
+ Some ( wrapped_db)
155
+ }
134
156
}
135
157
136
158
/// The wrapped alloy database type that is compatible with Db + DatabaseRef
0 commit comments