Skip to content

Commit 9c6c948

Browse files
committed
refactor: integrate env and cache tasks
1 parent 9be8e0f commit 9c6c948

File tree

10 files changed

+121
-224
lines changed

10 files changed

+121
-224
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ integration = []
2727
[dependencies]
2828
init4-bin-base = "0.3"
2929

30-
signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3130
signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3231
signet-sim = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3332
signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }

bin/builder.rs

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@ use builder::{
33
service::serve_builder,
44
tasks::{
55
block::sim::Simulator,
6-
cache::{BundlePoller, TxPoller},
6+
cache::{BundlePoller, CacheTask, TxPoller},
77
metrics::MetricsTask,
88
submit::SubmitTask,
99
},
1010
};
11-
use init4_bin_base::{deps::tracing, utils::from_env::FromEnv};
12-
use signet_sim::SimCache;
11+
use init4_bin_base::{
12+
deps::tracing::{info, info_span},
13+
utils::from_env::FromEnv,
14+
};
1315
use signet_types::constants::SignetSystemConstants;
1416
use tokio::select;
15-
use tracing::info_span;
1617

1718
// Note: Must be set to `multi_thread` to support async tasks.
1819
// See: https://docs.rs/tokio/latest/tokio/attr.main.html
@@ -21,74 +22,85 @@ async fn main() -> eyre::Result<()> {
2122
let _guard = init4_bin_base::init4();
2223
let init_span_guard = info_span!("builder initialization");
2324

25+
// Pull the configuration from the environment
2426
let config = BuilderConfig::from_env()?.clone();
2527
let constants = SignetSystemConstants::pecorino();
28+
29+
// Initialize the oauth token
2630
let token = config.oauth_token();
2731

32+
// Set up the EnvTask
33+
let env_task = config.env_task();
34+
let (env_watcher, env_jh) = env_task.spawn();
35+
36+
// Prep providers and contracts
2837
let (host_provider, quincey) =
2938
tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?;
3039
let ru_provider = config.connect_ru_provider();
31-
3240
let zenith = config.connect_zenith(host_provider.clone());
3341

42+
// Set up the metrics task
3443
let metrics = MetricsTask { host_provider };
3544
let (tx_channel, metrics_jh) = metrics.spawn();
3645

46+
// Make a Tx submission task
3747
let submit =
3848
SubmitTask { zenith, quincey, config: config.clone(), outbound_tx_channel: tx_channel };
3949

50+
// Tx Poller pulls transactions from the cache
4051
let tx_poller = TxPoller::new(&config);
4152
let (tx_receiver, tx_poller_jh) = tx_poller.spawn();
4253

54+
// Bundle Poller pulls bundles from the cache
4355
let bundle_poller = BundlePoller::new(&config, token);
4456
let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn();
4557

46-
let (submit_channel, submit_jh) = submit.spawn();
47-
48-
let sim_items = SimCache::new();
49-
let slot_calculator = config.slot_calculator;
58+
// Set up the cache task
59+
let cache_task = CacheTask::new(env_watcher.clone(), bundle_receiver, tx_receiver);
60+
let (sim_cache, cache_jh) = cache_task.spawn();
5061

51-
let sim = Simulator::new(&config, ru_provider.clone(), slot_calculator);
52-
53-
let (basefee_jh, sim_cache_jh) =
54-
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());
62+
// Set up tx submission
63+
let (submit_channel, submit_jh) = submit.spawn();
5564

56-
let build_jh = sim.spawn_simulator_task(constants, sim_items.clone(), submit_channel);
65+
// Set up the simulator
66+
let sim = Simulator::new(&config, ru_provider.clone());
67+
let build_jh = sim.spawn_simulator_task(constants, sim_cache, submit_channel);
5768

69+
// Start the healthcheck server
5870
let server = serve_builder(([0, 0, 0, 0], config.builder_port));
5971

6072
// We have finished initializing the builder, so we can drop the init span
6173
// guard.
6274
drop(init_span_guard);
6375

6476
select! {
77+
_ = cache_jh => {
78+
info!("cache task finished");
79+
},
80+
_ = env_jh => {
81+
info!("env task finished");
82+
},
6583
_ = tx_poller_jh => {
66-
tracing::info!("tx_poller finished");
84+
info!("tx_poller finished");
6785
},
6886
_ = bundle_poller_jh => {
69-
tracing::info!("bundle_poller finished");
87+
info!("bundle_poller finished");
7088
},
71-
_ = sim_cache_jh => {
72-
tracing::info!("sim cache task finished");
73-
}
74-
_ = basefee_jh => {
75-
tracing::info!("basefee task finished");
76-
}
7789
_ = submit_jh => {
78-
tracing::info!("submit finished");
90+
info!("submit finished");
7991
},
8092
_ = metrics_jh => {
81-
tracing::info!("metrics finished");
93+
info!("metrics finished");
8294
},
8395
_ = build_jh => {
84-
tracing::info!("build finished");
96+
info!("build finished");
8597
}
8698
_ = server => {
87-
tracing::info!("server finished");
99+
info!("server finished");
88100
}
89101
}
90102

91-
tracing::info!("shutting down");
103+
info!("shutting down");
92104

93105
Ok(())
94106
}

src/config.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use crate::{
22
quincey::Quincey,
33
signer::{LocalOrAws, SignerError},
4-
tasks::oauth::{Authenticator, SharedToken},
4+
tasks::{
5+
env::EnvTask,
6+
oauth::{Authenticator, SharedToken},
7+
},
58
};
69
use alloy::{
710
network::{Ethereum, EthereumWallet},
@@ -184,8 +187,13 @@ impl BuilderConfig {
184187

185188
/// Connect to the Rollup rpc provider.
186189
pub fn connect_ru_provider(&self) -> RootProvider<Ethereum> {
187-
let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL");
188-
RootProvider::<Ethereum>::new_http(url)
190+
static ONCE: std::sync::OnceLock<RootProvider<Ethereum>> = std::sync::OnceLock::new();
191+
192+
ONCE.get_or_init(|| {
193+
let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL");
194+
RootProvider::new_http(url)
195+
})
196+
.clone()
189197
}
190198

191199
/// Connect to the Host rpc provider.
@@ -240,4 +248,10 @@ impl BuilderConfig {
240248

241249
Ok(Quincey::new_remote(client, url, token))
242250
}
251+
252+
/// Create an [`EnvTask`] using this config.
253+
pub fn env_task(&self) -> EnvTask {
254+
let provider = self.connect_ru_provider();
255+
EnvTask::new(self.clone(), provider)
256+
}
243257
}

src/tasks/block/sim.rs

Lines changed: 10 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
use super::cfg::PecorinoBlockEnv;
55
use crate::{
66
config::{BuilderConfig, RuProvider},
7-
tasks::{block::cfg::PecorinoCfg, cache::Bundle},
7+
tasks::block::cfg::PecorinoCfg,
88
};
99
use alloy::{
10-
consensus::TxEnvelope,
1110
eips::{BlockId, BlockNumberOrTag::Latest},
1211
network::Ethereum,
1312
providers::Provider,
@@ -20,18 +19,10 @@ use init4_bin_base::{
2019
};
2120
use signet_sim::{BlockBuild, BuiltBlock, SimCache};
2221
use signet_types::constants::SignetSystemConstants;
23-
use std::{
24-
sync::{
25-
Arc,
26-
atomic::{AtomicU64, Ordering},
27-
},
28-
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
29-
};
22+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3023
use tokio::{
31-
select,
3224
sync::mpsc::{self},
3325
task::JoinHandle,
34-
time::sleep,
3526
};
3627
use trevm::revm::{
3728
database::{AlloyDB, WrapDatabaseAsync},
@@ -47,8 +38,6 @@ pub struct Simulator {
4738
pub config: BuilderConfig,
4839
/// A provider that cannot sign transactions, used for interacting with the rollup.
4940
pub ru_provider: RuProvider,
50-
/// The slot calculator for determining when to wake up and build blocks.
51-
pub slot_calculator: SlotCalculator,
5241
}
5342

5443
type AlloyDatabaseProvider = WrapDatabaseAsync<AlloyDB<Ethereum, RuProvider>>;
@@ -60,17 +49,17 @@ impl Simulator {
6049
///
6150
/// - `config`: The configuration for the builder.
6251
/// - `ru_provider`: A provider for interacting with the rollup.
63-
/// - `slot_calculator`: A slot calculator for managing block timing.
6452
///
6553
/// # Returns
6654
///
6755
/// A new `Simulator` instance.
68-
pub fn new(
69-
config: &BuilderConfig,
70-
ru_provider: RuProvider,
71-
slot_calculator: SlotCalculator,
72-
) -> Self {
73-
Self { config: config.clone(), ru_provider, slot_calculator }
56+
pub fn new(config: &BuilderConfig, ru_provider: RuProvider) -> Self {
57+
Self { config: config.clone(), ru_provider }
58+
}
59+
60+
/// Get the slot calculator.
61+
pub const fn slot_calculator(&self) -> &SlotCalculator {
62+
&self.config.slot_calculator
7463
}
7564

7665
/// Handles building a single block.
@@ -110,85 +99,6 @@ impl Simulator {
11099
Ok(block)
111100
}
112101

113-
/// Spawns two tasks: one to handle incoming transactions and bundles,
114-
/// adding them to the simulation cache, and one to track the latest basefee.
115-
///
116-
/// # Arguments
117-
///
118-
/// - `tx_receiver`: A channel receiver for incoming transactions.
119-
/// - `bundle_receiver`: A channel receiver for incoming bundles.
120-
/// - `cache`: The simulation cache to store the received items.
121-
///
122-
/// # Returns
123-
///
124-
/// A `JoinHandle` for the basefee updater and a `JoinHandle` for the
125-
/// cache handler.
126-
pub fn spawn_cache_tasks(
127-
&self,
128-
tx_receiver: mpsc::UnboundedReceiver<TxEnvelope>,
129-
bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
130-
cache: SimCache,
131-
) -> (JoinHandle<()>, JoinHandle<()>) {
132-
debug!("starting up cache handler");
133-
134-
let basefee_price = Arc::new(AtomicU64::new(0_u64));
135-
let basefee_reader = Arc::clone(&basefee_price);
136-
let fut = self.basefee_updater_fut(basefee_price);
137-
138-
// Update the basefee on a per-block cadence
139-
let basefee_jh = tokio::spawn(fut);
140-
141-
// Update the sim cache whenever a transaction or bundle is received with respect to the basefee
142-
let cache_jh = tokio::spawn(async move {
143-
cache_updater(tx_receiver, bundle_receiver, cache, basefee_reader).await
144-
});
145-
146-
(basefee_jh, cache_jh)
147-
}
148-
149-
/// Periodically updates the shared basefee by querying the latest block.
150-
///
151-
/// This function calculates the remaining time until the next slot,
152-
/// sleeps until that time, and then retrieves the latest basefee from the rollup provider.
153-
/// The updated basefee is stored in the provided `AtomicU64`.
154-
///
155-
/// This function runs continuously.
156-
///
157-
/// # Arguments
158-
///
159-
/// - `price`: A shared `Arc<AtomicU64>` used to store the updated basefee value.
160-
fn basefee_updater_fut(&self, price: Arc<AtomicU64>) -> impl Future<Output = ()> + use<> {
161-
let slot_calculator = self.slot_calculator;
162-
let ru_provider = self.ru_provider.clone();
163-
164-
async move {
165-
debug!("starting basefee updater");
166-
loop {
167-
// calculate start of next slot plus a small buffer
168-
let time_remaining = slot_calculator.slot_duration()
169-
- slot_calculator.current_timepoint_within_slot()
170-
+ 1;
171-
debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot");
172-
173-
// wait until that point in time
174-
sleep(Duration::from_secs(time_remaining)).await;
175-
176-
// update the basefee with that price
177-
let resp = ru_provider.get_block_by_number(Latest).await.inspect_err(|e| {
178-
error!(error = %e, "RPC error during basefee update");
179-
});
180-
181-
if let Ok(Some(block)) = resp {
182-
let basefee = block.header.base_fee_per_gas.unwrap_or(0);
183-
price.store(basefee, Ordering::Relaxed);
184-
debug!(basefee = basefee, "basefee updated");
185-
} else {
186-
warn!("get basefee failed - an error likely occurred");
187-
}
188-
}
189-
}
190-
}
191-
192102
/// Spawns the simulator task, which handles the setup and sets the deadline
193103
/// for the each round of simulation.
194104
///
@@ -269,7 +179,7 @@ impl Simulator {
269179
let now = SystemTime::now();
270180
let unix_seconds = now.duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
271181
// Calculate the time remaining in the current slot
272-
let remaining = self.slot_calculator.calculate_timepoint_within_slot(unix_seconds);
182+
let remaining = self.slot_calculator().calculate_timepoint_within_slot(unix_seconds);
273183
// Deadline is equal to the start of the next slot plus the time remaining in this slot
274184
Instant::now() + Duration::from_secs(remaining)
275185
}
@@ -365,41 +275,3 @@ impl Simulator {
365275
Ok(block.header.base_fee_per_gas)
366276
}
367277
}
368-
369-
/// Continuously updates the simulation cache with incoming transactions and bundles.
370-
///
371-
/// This function listens for new transactions and bundles on their respective
372-
/// channels and adds them to the simulation cache using the latest observed basefee.
373-
///
374-
/// # Arguments
375-
///
376-
/// - `tx_receiver`: A receiver channel for incoming Ethereum transactions.
377-
/// - `bundle_receiver`: A receiver channel for incoming transaction bundles.
378-
/// - `cache`: The simulation cache used to store transactions and bundles.
379-
/// - `price_reader`: An `Arc<AtomicU64>` providing the latest basefee for simulation pricing.
380-
async fn cache_updater(
381-
mut tx_receiver: mpsc::UnboundedReceiver<
382-
alloy::consensus::EthereumTxEnvelope<alloy::consensus::TxEip4844Variant>,
383-
>,
384-
mut bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
385-
cache: SimCache,
386-
price_reader: Arc<AtomicU64>,
387-
) -> ! {
388-
loop {
389-
let p = price_reader.load(Ordering::Relaxed);
390-
select! {
391-
maybe_tx = tx_receiver.recv() => {
392-
if let Some(tx) = maybe_tx {
393-
debug!(tx = ?tx.hash(), "received transaction");
394-
cache.add_item(tx, p);
395-
}
396-
}
397-
maybe_bundle = bundle_receiver.recv() => {
398-
if let Some(bundle) = maybe_bundle {
399-
debug!(bundle = ?bundle.id, "received bundle");
400-
cache.add_item(bundle.bundle, p);
401-
}
402-
}
403-
}
404-
}
405-
}

0 commit comments

Comments
 (0)