Skip to content

updates bundler to streaming actor pattern #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/tasks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ impl BlockBuilder {
error!(error = %e, "error polling bundles");
}
}
self.bundle_poller.evict();
}

/// Simulates a Zenith bundle against the rollup state
Expand Down
121 changes: 70 additions & 51 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,63 @@
//! Bundler service responsible for managing bundles.
use super::oauth::Authenticator;

//! Bundler service responsible for fetching bundles and sending them to the simulator.
pub use crate::config::BuilderConfig;

use crate::tasks::oauth::Authenticator;
use oauth2::TokenResponse;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: couldn't we unify these imports?

use tokio::task::JoinHandle;
use tokio::time;
use tracing::{Instrument, debug, trace};
use zenith_types::ZenithEthBundle;

/// A bundle response from the tx-pool endpoint, containing a UUID and a
/// [`ZenithEthBundle`].
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bundle {
/// The bundle id (a UUID)
/// Cache identifier for the bundle
pub id: String,
/// The bundle itself
/// The Zenith bundle for this bundle
pub bundle: ZenithEthBundle,
}

/// Response from the tx-pool containing a list of bundles.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxPoolBundleResponse {
/// the list of bundles
/// Bundle responses are available on the bundles property.
pub bundles: Vec<Bundle>,
}

/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles.
#[derive(Debug)]
/// The BundlePoller polls the tx-pool for bundles.
#[derive(Debug, Clone)]
pub struct BundlePoller {
/// Configuration
/// The builder configuration values.
pub config: BuilderConfig,
/// [`Authenticator`] for fetching OAuth tokens
/// Authentication module that periodically fetches and stores auth tokens.
pub authenticator: Authenticator,
/// Already seen bundle UUIDs
pub seen_uuids: HashMap<String, Instant>,
/// Defines the interval at which the bundler polls the tx-pool for bundles.
pub poll_interval_ms: u64,
}

/// Implements a poller for the block builder to pull bundles from the tx cache.
/// Implements a poller for the block builder to pull bundles from the tx-pool.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() }
Self { config: config.clone(), authenticator, poll_interval_ms: 1000 }
}

/// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
let mut unique: Vec<Bundle> = Vec::new();
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
pub fn new_with_poll_interval_ms(
config: &BuilderConfig,
authenticator: Authenticator,
poll_interval_ms: u64,
) -> Self {
Self { config: config.clone(), authenticator, poll_interval_ms }
}

/// Fetches bundles from the transaction cache and returns them.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
let token = self.authenticator.fetch_oauth_token().await?;

// Add the token to the request headers
let result = reqwest::Client::new()
.get(bundle_url)
.bearer_auth(token.access_token().secret())
Expand All @@ -61,38 +66,52 @@ impl BundlePoller {
.error_for_status()?;

let body = result.bytes().await?;
let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?;

bundles.bundles.iter().for_each(|bundle| {
self.check_seen_bundles(bundle.clone(), &mut unique);
});
let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?;

Ok(unique)
Ok(resp.bundles)
}

/// Checks if the bundle has been seen before and if not, adds it to the unique bundles list.
fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec<Bundle>) {
self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| {
// add to the set of unique bundles
unique.push(bundle.clone());
Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration)
});
}
async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
loop {
let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

/// Evicts expired bundles from the cache.
pub fn evict(&mut self) {
let expired_keys: Vec<String> = self
.seen_uuids
.iter()
.filter_map(
|(key, expiry)| {
if expiry.elapsed().is_zero() { Some(key.clone()) } else { None }
},
)
.collect();

for key in expired_keys {
self.seen_uuids.remove(&key);
// Enter the span for the next check.
let _guard = span.enter();

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
trace!("No receivers left, shutting down");
break;
}
// exit the span after the check.
drop(_guard);

match self.check_bundle_cache().instrument(span.clone()).await {
Ok(bundles) => {
tracing::debug!(count = ?bundles.len(), "found bundles");
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
tracing::error!(err = ?err, "Failed to send bundle - channel is dropped");
}
}
}
// If fetching was an error, we log and continue. We expect
// these to be transient network issues.
Err(e) => {
debug!(error = %e, "Error fetching bundles");
}
}
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
}
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();

let jh = tokio::spawn(self.task_future(outbound));

(inbound, jh)
}
}