Skip to content

Commit 533d34e

Browse files
committed
updates the tx-poller to stream transactions
- shifts the tx-poller to a more actor oriented approach by streaming transactions out of the cache. - transaction deduplication and validation is intended to be carried out by the simulator.
1 parent 4f46318 commit 533d34e

File tree

1 file changed

+30
-7
lines changed

1 file changed

+30
-7
lines changed

src/tasks/tx_poller.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,62 @@
1+
use std::sync::Arc;
2+
13
use alloy::consensus::TxEnvelope;
24
use eyre::Error;
35
use reqwest::{Client, Url};
46
use serde::{Deserialize, Serialize};
57
use serde_json::from_slice;
8+
use tokio::{sync::mpsc, task::JoinHandle};
69

710
pub use crate::config::BuilderConfig;
811

9-
/// Response from the tx-pool endpoint.
12+
/// Models a response from the transaction pool.
1013
#[derive(Debug, Clone, Serialize, Deserialize)]
1114
pub struct TxPoolResponse {
15+
/// Holds the transactions property as a list on the response.
1216
transactions: Vec<TxEnvelope>,
1317
}
1418

1519
/// Implements a poller for the block builder to pull transactions from the transaction pool.
16-
#[derive(Debug)]
20+
#[derive(Debug, Clone)]
1721
pub struct TxPoller {
18-
/// config for the builder
22+
/// Config values from the Builder.
1923
pub config: BuilderConfig,
20-
/// Reqwest client for fetching transactions from the tx-pool
24+
/// Reqwest Client for fetching transactions from the tx-pool.
2125
pub client: Client,
2226
}
2327

24-
/// TxPoller implements a poller that fetches unique transactions from the transaction pool.
28+
/// TxPoller implements a poller task that fetches unique transactions from the transaction pool.
2529
impl TxPoller {
26-
/// returns a new TxPoller with the given config.
30+
/// Returns a new TxPoller with the given config.
2731
pub fn new(config: &BuilderConfig) -> Self {
2832
Self { config: config.clone(), client: Client::new() }
2933
}
3034

31-
/// polls the tx-pool for unique transactions and evicts expired transactions.
35+
/// Polls the tx-pool for unique transactions and evicts expired transactions.
3236
/// unique transactions that haven't been seen before are sent into the builder pipeline.
3337
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
3438
let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?;
3539
let result = self.client.get(url).send().await?;
3640
let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?;
3741
Ok(response.transactions)
3842
}
43+
44+
/// Spawns a task that trawls the cache for transactions and sends along anything it finds
45+
pub fn spawn(mut self) -> (mpsc::UnboundedReceiver<Arc<TxEnvelope>>, JoinHandle<()>) {
46+
let (outbound, inbound) = mpsc::unbounded_channel();
47+
let jh = tokio::spawn(async move {
48+
loop {
49+
if let Ok(transactions) = self.check_tx_cache().await {
50+
tracing::debug!(count = ?transactions.len(), "found transactions");
51+
for tx in transactions.iter() {
52+
if let Err(err) = outbound.send(Arc::new(tx.clone())) {
53+
tracing::error!(err = ?err, "failed to send transaction outbound");
54+
}
55+
}
56+
}
57+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
58+
}
59+
});
60+
(inbound, jh)
61+
}
3962
}

0 commit comments

Comments
 (0)