Skip to content

updates the tx-poller to stream transactions #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
20 changes: 15 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ name = "transaction-submitter"
path = "bin/submit_transaction.rs"

[dependencies]
init4-bin-base = "0.1.0"

zenith-types = "0.13"

alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp"] }
init4-bin-base = { git = "https://github.com/init4tech/bin-base.git" }

signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }

alloy = { version = "0.12.6", features = [
"full",
"json-rpc",
"signer-aws",
"rpc-types-mev",
"rlp",
"node-bindings",
"serde",
] }

aws-config = "1.1.7"
aws-sdk-kms = "1.15.0"
Expand Down
3 changes: 1 addition & 2 deletions bin/submit_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ async fn connect_from_config() -> (Provider, Address, u64) {
let signer = AwsSigner::new(client, kms_key_id.to_string(), Some(chain_id)).await.unwrap();

let provider = ProviderBuilder::new()
.with_recommended_fillers()
.wallet(EthereumWallet::from(signer))
.on_builtin(&rpc_url)
.connect(&rpc_url)
.await
.unwrap();

Expand Down
48 changes: 24 additions & 24 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use alloy::{
WalletFiller,
},
},
transports::BoxTransport,
};
use signet_zenith::Zenith;
use std::{borrow::Cow, env, num, str::FromStr};
use zenith_types::Zenith;

// Keys for .env variables that need to be set to configure the builder.
const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID";
Expand Down Expand Up @@ -125,7 +124,7 @@ impl ConfigError {
}
}

/// Provider type used to read & write.
/// Type alias for the provider used in the builder.
pub type Provider = FillProvider<
JoinFill<
JoinFill<
Expand All @@ -134,26 +133,22 @@ pub type Provider = FillProvider<
>,
WalletFiller<EthereumWallet>,
>,
RootProvider<BoxTransport>,
BoxTransport,
RootProvider,
Ethereum,
>;

/// Provider type used to read-only.
/// Type alias for the provider used in the builder, without a wallet.
pub type WalletlessProvider = FillProvider<
JoinFill<
Identity,
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
>,
RootProvider<BoxTransport>,
BoxTransport,
RootProvider,
Ethereum,
>;

/// A Zenith contract instance, using some provider `P` (defaults to
/// [`Provider`]).
pub type ZenithInstance<P = Provider> =
Zenith::ZenithInstance<BoxTransport, P, alloy::network::Ethereum>;
/// A [`Zenith`] contract instance using [`Provider`] as the provider.
pub type ZenithInstance<P = Provider> = Zenith::ZenithInstance<(), P, alloy::network::Ethereum>;

impl BuilderConfig {
/// Load the builder configuration from environment variables.
Expand Down Expand Up @@ -210,32 +205,36 @@ impl BuilderConfig {

/// Connect to the Rollup rpc provider.
pub async fn connect_ru_provider(&self) -> Result<WalletlessProvider, ConfigError> {
ProviderBuilder::new()
.with_recommended_fillers()
.on_builtin(&self.ru_rpc_url)
let provider = ProviderBuilder::new()
.connect(&self.ru_rpc_url)
.await
.map_err(Into::into)
.map_err(ConfigError::Provider)?;

Ok(provider)
}

/// Connect to the Host rpc provider.
pub async fn connect_host_provider(&self) -> Result<Provider, ConfigError> {
let builder_signer = self.connect_builder_signer().await?;
ProviderBuilder::new()
.with_recommended_fillers()
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::from(builder_signer))
.on_builtin(&self.host_rpc_url)
.connect(&self.host_rpc_url)
.await
.map_err(Into::into)
.map_err(ConfigError::Provider)?;

Ok(provider)
}

/// Connect additional broadcast providers.
pub async fn connect_additional_broadcast(
&self,
) -> Result<Vec<RootProvider<BoxTransport>>, ConfigError> {
let mut providers = Vec::with_capacity(self.tx_broadcast_urls.len());
) -> Result<Vec<WalletlessProvider>, ConfigError> {
let mut providers: Vec<WalletlessProvider> =
Vec::with_capacity(self.tx_broadcast_urls.len());
for url in self.tx_broadcast_urls.iter() {
let provider =
ProviderBuilder::new().on_builtin(url).await.map_err(Into::<ConfigError>::into)?;
ProviderBuilder::new().connect(url).await.map_err(ConfigError::Provider)?;

providers.push(provider);
}
Ok(providers)
Expand Down Expand Up @@ -278,5 +277,6 @@ pub fn load_url(key: &str) -> Result<Cow<'static, str>, ConfigError> {
/// Load an address from an environment variable.
pub fn load_address(key: &str) -> Result<Address, ConfigError> {
let address = load_string(key)?;
Address::from_str(&address).map_err(Into::into)
Address::from_str(&address)
.map_err(|_| ConfigError::Var(format!("Invalid address format for {}", key)))
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ pub mod tasks;
/// Utilities.
pub mod utils;

// Anonymous import suppresses warnings about unused imports.
use openssl as _;
12 changes: 6 additions & 6 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::{BuilderConfig, WalletlessProvider};
use alloy::{
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
consensus::{SidecarBuilder, SidecarCoder, transaction::TxEnvelope},
eips::eip2718::Decodable2718,
primitives::{B256, Bytes, keccak256},
providers::Provider as _,
rlp::Buf,
};
use signet_bundle::SignetEthBundle;
use signet_zenith::{Alloy2718Coder, encode_txns};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::OnceLock, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{Instrument, debug, error, info, trace};
use zenith_types::{Alloy2718Coder, ZenithEthBundle, encode_txns};

/// Ethereum's slot time in seconds.
pub const ETHEREUM_SLOT_TIME: u64 = 12;
Expand Down Expand Up @@ -180,11 +181,10 @@ impl BlockBuilder {
error!(error = %e, "error polling bundles");
}
}
self.bundle_poller.evict();
}

/// Simulates a Zenith bundle against the rollup state
async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> {
async fn simulate_bundle(&mut self, bundle: &SignetEthBundle) -> eyre::Result<()> {
// TODO: Simulate bundles with the Simulation Engine
// [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles)
debug!(hash = ?bundle.bundle.bundle_hash(), block_number = ?bundle.block_number(), "bundle simulations is not implemented yet - skipping simulation");
Expand Down Expand Up @@ -270,7 +270,7 @@ mod tests {
rpc::types::{TransactionRequest, mev::EthSendBundle},
signers::local::PrivateKeySigner,
};
use zenith_types::ZenithEthBundle;
use signet_bundle::SignetEthBundle;

/// Create a mock bundle for testing with a single transaction
async fn create_mock_bundle(wallet: &EthereumWallet) -> Bundle {
Expand All @@ -295,7 +295,7 @@ mod tests {
replacement_uuid: Some("replacement_uuid".to_owned()),
};

let zenith_bundle = ZenithEthBundle { bundle: eth_bundle, host_fills: None };
let zenith_bundle = SignetEthBundle { bundle: eth_bundle, host_fills: None };

Bundle { id: "mock_bundle".to_owned(), bundle: zenith_bundle }
}
Expand Down
124 changes: 71 additions & 53 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,62 @@
//! Bundler service responsible for managing bundles.
use super::oauth::Authenticator;

//! Bundler service responsible for fetching bundles and sending them to the simulator.
pub use crate::config::BuilderConfig;

use crate::tasks::oauth::Authenticator;
use oauth2::TokenResponse;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use zenith_types::ZenithEthBundle;

/// A bundle response from the tx-pool endpoint, containing a UUID and a
/// [`ZenithEthBundle`].
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use tokio::task::JoinHandle;
use tokio::time;
use tracing::{Instrument, debug, trace};
use signet_bundle::SignetEthBundle;
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bundle {
/// The bundle id (a UUID)
/// Cache identifier for the bundle
pub id: String,
/// The bundle itself
pub bundle: ZenithEthBundle,
/// The Zenith bundle for this bundle
pub bundle: SignetEthBundle,
}

/// Response from the tx-pool containing a list of bundles.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxPoolBundleResponse {
/// the list of bundles
/// Bundle responses are available on the bundles property.
pub bundles: Vec<Bundle>,
}

/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles.
#[derive(Debug)]
/// The BundlePoller polls the tx-pool for bundles.
#[derive(Debug, Clone)]
pub struct BundlePoller {
/// Configuration
/// The builder configuration values.
pub config: BuilderConfig,
/// [`Authenticator`] for fetching OAuth tokens
/// Authentication module that periodically fetches and stores auth tokens.
pub authenticator: Authenticator,
/// Already seen bundle UUIDs
pub seen_uuids: HashMap<String, Instant>,
/// Defines the interval at which the bundler polls the tx-pool for bundles.
pub poll_interval_ms: u64,
}

/// Implements a poller for the block builder to pull bundles from the tx cache.
/// Implements a poller for the block builder to pull bundles from the tx-pool.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() }
Self { config: config.clone(), authenticator, poll_interval_ms: 1000 }
}

/// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
let mut unique: Vec<Bundle> = Vec::new();
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
pub fn new_with_poll_interval_ms(
config: &BuilderConfig,
authenticator: Authenticator,
poll_interval_ms: u64,
) -> Self {
Self { config: config.clone(), authenticator, poll_interval_ms }
}

/// Fetches bundles from the transaction cache and returns them.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
let token = self.authenticator.fetch_oauth_token().await?;

// Add the token to the request headers
let result = reqwest::Client::new()
.get(bundle_url)
.bearer_auth(token.access_token().secret())
Expand All @@ -61,38 +65,52 @@ impl BundlePoller {
.error_for_status()?;

let body = result.bytes().await?;
let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?;

bundles.bundles.iter().for_each(|bundle| {
self.check_seen_bundles(bundle.clone(), &mut unique);
});
let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?;

Ok(unique)
Ok(resp.bundles)
}

/// Checks if the bundle has been seen before and if not, adds it to the unique bundles list.
fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec<Bundle>) {
self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| {
// add to the set of unique bundles
unique.push(bundle.clone());
Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration)
});
}
async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
loop {
let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

// Enter the span for the next check.
let _guard = span.enter();

/// Evicts expired bundles from the cache.
pub fn evict(&mut self) {
let expired_keys: Vec<String> = self
.seen_uuids
.iter()
.filter_map(
|(key, expiry)| {
if expiry.elapsed().is_zero() { Some(key.clone()) } else { None }
},
)
.collect();
// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
trace!("No receivers left, shutting down");
break;
}
// exit the span after the check.
drop(_guard);

for key in expired_keys {
self.seen_uuids.remove(&key);
match self.check_bundle_cache().instrument(span.clone()).await {
Ok(bundles) => {
tracing::debug!(count = ?bundles.len(), "found bundles");
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
tracing::error!(err = ?err, "Failed to send bundle - channel is dropped");
}
}
}
// If fetching was an error, we log and continue. We expect
// these to be transient network issues.
Err(e) => {
debug!(error = %e, "Error fetching bundles");
}
}
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
}
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();

let jh = tokio::spawn(self.task_future(outbound));

(inbound, jh)
}
}
Loading
Loading