Skip to content

fix: various improvements to bundler tasks #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use crate::tasks::oauth::Authenticator;
use oauth2::TokenResponse;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use tokio::task::JoinHandle;
use tokio::time;
use tracing::{Instrument, debug, trace};
use zenith_types::ZenithEthBundle;

/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
Expand Down Expand Up @@ -69,22 +71,46 @@ impl BundlePoller {
Ok(resp.bundles)
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(mut self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();
let jh = tokio::spawn(async move {
loop {
if let Ok(bundles) = self.check_bundle_cache().await {
async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
loop {
let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

// Enter the span for the next check.
let _guard = span.enter();

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
trace!("No receivers left, shutting down");
break;
}
// exit the span after the check.
drop(_guard);

match self.check_bundle_cache().instrument(span.clone()).await {
Ok(bundles) => {
tracing::debug!(count = ?bundles.len(), "found bundles");
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
tracing::error!(err = ?err, "Failed to send bundle - channel is dropped");
}
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await;
// If fetching was an error, we log and continue. We expect
// these to be transient network issues.
Err(e) => {
debug!(error = %e, "Error fetching bundles");
}
}
});
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
}
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();

let jh = tokio::spawn(self.task_future(outbound));

(inbound, jh)
}
Expand Down
Loading