Skip to content

Commit ae48e92

Browse files
dylanlottprestwich
authored andcommitted
refactors: submit channel refactors
1 parent dda4506 commit ae48e92

File tree

1 file changed

+187
-45
lines changed

1 file changed

+187
-45
lines changed

src/tasks/submit.rs

Lines changed: 187 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
utils::extract_signature_components,
55
};
66
use alloy::{
7-
consensus::{SimpleCoder, constants::GWEI_TO_WEI},
7+
consensus::{SimpleCoder, Transaction, constants::GWEI_TO_WEI},
88
eips::BlockNumberOrTag,
99
network::{TransactionBuilder, TransactionBuilder4844},
1010
primitives::{FixedBytes, TxHash, U256},
@@ -22,9 +22,9 @@ use signet_sim::BuiltBlock;
2222
use signet_types::{SignRequest, SignResponse};
2323
use signet_zenith::{
2424
BundleHelper::{self, BlockHeader, FillPermit2, submitCall},
25-
Zenith::IncorrectHostBlock,
25+
Zenith::{self, IncorrectHostBlock},
2626
};
27-
use std::time::Instant;
27+
use std::time::{Instant, UNIX_EPOCH};
2828
use tokio::{sync::mpsc, task::JoinHandle};
2929

3030
macro_rules! spawn_provider_send {
@@ -53,17 +53,14 @@ pub enum ControlFlow {
5353
}
5454

5555
/// Submits sidecars in ethereum txns to mainnet ethereum
56-
#[derive(Debug)]
56+
#[derive(Debug, Clone)]
5757
pub struct SubmitTask {
5858
/// Zenith
5959
pub zenith: ZenithInstance,
60-
6160
/// Quincey
6261
pub quincey: Quincey,
63-
6462
/// Config
6563
pub config: crate::config::BuilderConfig,
66-
6764
/// Channel over which to send pending transactions
6865
pub outbound_tx_channel: mpsc::UnboundedSender<TxHash>,
6966
}
@@ -99,47 +96,69 @@ impl SubmitTask {
9996
v: u8,
10097
r: FixedBytes<32>,
10198
s: FixedBytes<32>,
102-
in_progress: &BuiltBlock,
99+
block: &BuiltBlock,
103100
) -> eyre::Result<TransactionRequest> {
104101
let data = submitCall { fills, header, v, r, s }.abi_encode();
105102

106-
let sidecar = in_progress.encode_blob::<SimpleCoder>().build()?;
103+
let sidecar = block.encode_blob::<SimpleCoder>().build()?;
107104
Ok(TransactionRequest::default()
108105
.with_blob_sidecar(sidecar)
109106
.with_input(data)
110107
.with_max_priority_fee_per_gas((GWEI_TO_WEI * 16) as u128))
111108
}
112109

113-
/// Returns the next host block height
110+
/// Returns the next host block height.
114111
async fn next_host_block_height(&self) -> eyre::Result<u64> {
115112
let result = self.provider().get_block_number().await?;
116113
let next = result.checked_add(1).ok_or_else(|| eyre!("next host block height overflow"))?;
117114
Ok(next)
118115
}
119116

120-
/// Submits the EIP 4844 transaction to the network
117+
/// Prepares and then sends the EIP-4844 transaction with a sidecar encoded with a rollup block to the network.
121118
async fn submit_transaction(
122119
&self,
120+
retry_count: usize,
123121
resp: &SignResponse,
124-
in_progress: &BuiltBlock,
122+
block: &BuiltBlock,
125123
) -> eyre::Result<ControlFlow> {
124+
let tx = self.prepare_tx(retry_count, resp, block).await?;
125+
126+
self.send_transaction(resp, tx).await
127+
}
128+
129+
/// Prepares the transaction by extracting the signature components, creating the transaction
130+
/// request, and simulating the transaction with a call to the host provider.
131+
async fn prepare_tx(
132+
&self,
133+
retry_count: usize,
134+
resp: &SignResponse,
135+
block: &BuiltBlock,
136+
) -> Result<TransactionRequest, eyre::Error> {
137+
// Extract the signature components from the response
126138
let (v, r, s) = extract_signature_components(&resp.sig);
127139

128-
let header = BlockHeader {
129-
hostBlockNumber: resp.req.host_block_number,
130-
rollupChainId: U256::from(self.config.ru_chain_id),
131-
gasLimit: resp.req.gas_limit,
132-
rewardAddress: resp.req.ru_reward_address,
133-
blockDataHash: *in_progress.contents_hash(),
134-
};
140+
// Create the transaction request with the signature values
141+
let tx: TransactionRequest = self.tx_request(retry_count, resp, block, v, r, s).await?;
135142

136-
let fills = vec![]; // NB: ignored until fills are implemented
137-
let tx = self
138-
.build_blob_tx(fills, header, v, r, s, in_progress)?
139-
.with_from(self.provider().default_signer_address())
140-
.with_to(self.config.builder_helper_address)
141-
.with_gas_limit(1_000_000);
143+
// Simulate the transaction with a call to the host provider
144+
if let Some(maybe_error) = self.sim_with_call(&tx).await {
145+
warn!(
146+
error = ?maybe_error,
147+
"error in transaction simulation"
148+
);
149+
if let Err(e) = maybe_error {
150+
return Err(e);
151+
}
152+
}
153+
154+
Ok(tx)
155+
}
142156

157+
/// Simulates the transaction with a call to the host provider to check for reverts.
158+
async fn sim_with_call(
159+
&self,
160+
tx: &TransactionRequest,
161+
) -> Option<Result<ControlFlow, eyre::Error>> {
143162
if let Err(TransportError::ErrorResp(e)) =
144163
self.provider().call(tx.clone()).block(BlockNumberOrTag::Pending.into()).await
145164
{
@@ -154,16 +173,79 @@ impl SubmitTask {
154173
.map(|data| data.starts_with(&IncorrectHostBlock::SELECTOR))
155174
.unwrap_or_default()
156175
{
157-
return Ok(ControlFlow::Retry);
176+
debug!(%e, "incorrect host block");
177+
return Some(Ok(ControlFlow::Retry));
158178
}
159179

160-
return Ok(ControlFlow::Skip);
180+
if e.as_revert_data()
181+
.map(|data| data.starts_with(&Zenith::BadSignature::SELECTOR))
182+
.unwrap_or_default()
183+
{
184+
debug!(%e, "bad signature");
185+
return Some(Ok(ControlFlow::Skip));
186+
}
187+
188+
if e.as_revert_data()
189+
.map(|data| data.starts_with(&Zenith::OneRollupBlockPerHostBlock::SELECTOR))
190+
.unwrap_or_default()
191+
{
192+
debug!(%e, "one rollup block per host block");
193+
return Some(Ok(ControlFlow::Skip));
194+
}
195+
196+
return Some(Ok(ControlFlow::Skip));
161197
}
162198

163-
// All validation checks have passed, send the transaction
164-
self.send_transaction(resp, tx).await
199+
debug!(?tx, "successfully simulated transaction request");
200+
None
165201
}
166202

203+
/// Creates a transaction request for the blob with the given header and signature values.
204+
async fn tx_request(
205+
&self,
206+
retry_count: usize,
207+
resp: &SignResponse,
208+
block: &BuiltBlock,
209+
v: u8,
210+
r: FixedBytes<32>,
211+
s: FixedBytes<32>,
212+
) -> Result<TransactionRequest, eyre::Error> {
213+
// TODO: ENG-1082 Implement fills
214+
let fills = vec![];
215+
216+
// Bump gas with each retry to replace the previous transaction while maintaining the same nonce
217+
let gas_coefficient = 10 * (retry_count + 1) as u64;
218+
let gas_limit = 1_000_000 + (gas_coefficient * 1_000_000);
219+
debug!(retry_count, gas_coefficient, gas_limit, "calculated gas limit");
220+
221+
// Build the block header
222+
let header: BlockHeader = BlockHeader {
223+
hostBlockNumber: resp.req.host_block_number,
224+
rollupChainId: U256::from(self.config.ru_chain_id),
225+
gasLimit: resp.req.gas_limit,
226+
rewardAddress: resp.req.ru_reward_address,
227+
blockDataHash: *block.contents_hash(),
228+
};
229+
debug!(?header, "built block header");
230+
231+
// manually retrieve nonce
232+
let nonce =
233+
self.provider().get_transaction_count(self.provider().default_signer_address()).await?;
234+
debug!(nonce, "manually setting transaction nonce");
235+
236+
// Create a blob transaction with the blob header and signature values and return it
237+
let tx = self
238+
.build_blob_tx(fills, header, v, r, s, block)?
239+
.with_from(self.provider().default_signer_address())
240+
.with_to(self.config.builder_helper_address)
241+
.with_gas_limit(gas_limit);
242+
243+
debug!(?tx, "prepared transaction request");
244+
Ok(tx)
245+
}
246+
247+
/// Fills the transaction request with the provider and sends it to the network
248+
/// and any additionally configured broadcast providers.
167249
async fn send_transaction(
168250
&self,
169251
resp: &SignResponse,
@@ -172,17 +254,21 @@ impl SubmitTask {
172254
debug!(
173255
host_block_number = %resp.req.host_block_number,
174256
gas_limit = %resp.req.gas_limit,
257+
nonce = ?tx.nonce,
175258
"sending transaction to network"
176259
);
260+
177261

262+
// assign the nonce and fill the rest of the values
178263
let SendableTx::Envelope(tx) = self.provider().fill(tx).await? else {
179264
bail!("failed to fill transaction")
180265
};
266+
debug!(tx_hash = %tx.tx_hash(), nonce = ?tx.nonce(), gas_limit = ?tx.gas_limit(), blob_gas_used = ?tx.blob_gas_used(), "filled blob transaction");
181267

182-
// Send the tx via the primary host_provider
268+
// send the tx via the primary host_provider
183269
let fut = spawn_provider_send!(self.provider(), &tx);
184270

185-
// Spawn send_tx futures for all additional broadcast host_providers
271+
// spawn send_tx futures for all additional broadcast host_providers
186272
for host_provider in self.config.connect_additional_broadcast() {
187273
spawn_provider_send!(&host_provider, &tx);
188274
}
@@ -209,9 +295,14 @@ impl SubmitTask {
209295
Ok(ControlFlow::Done)
210296
}
211297

298+
/// Handles the inbound block by constructing a signature request and submitting the transaction.
212299
#[instrument(skip_all)]
213-
async fn handle_inbound(&self, block: &BuiltBlock) -> eyre::Result<ControlFlow> {
214-
info!(txns = block.tx_count(), "handling inbound block");
300+
async fn handle_inbound(
301+
&self,
302+
retry_count: usize,
303+
block: &BuiltBlock,
304+
) -> eyre::Result<ControlFlow> {
305+
info!(retry_count, txns = block.tx_count(), "handling inbound block");
215306
let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| {
216307
error!(error = %e, "error constructing signature request");
217308
}) else {
@@ -226,47 +317,76 @@ impl SubmitTask {
226317

227318
let signed = self.quincey.get_signature(&sig_request).await?;
228319

229-
self.submit_transaction(&signed, block).await
320+
self.submit_transaction(retry_count, &signed, block).await
230321
}
231322

323+
/// Handles the retry logic for the inbound block.
232324
async fn retrying_handle_inbound(
233325
&self,
234326
block: &BuiltBlock,
235327
retry_limit: usize,
236328
) -> eyre::Result<ControlFlow> {
237329
let mut retries = 0;
238330
let building_start_time = Instant::now();
331+
let (current_slot, start, end) = self.calculate_slot_window()?;
332+
debug!(current_slot, start, end, "calculating target slot window");
239333

334+
// Retry loop
240335
let result = loop {
241336
let span = debug_span!("SubmitTask::retrying_handle_inbound", retries);
337+
debug!(retries, "number of retries");
242338

243-
let result =
244-
self.handle_inbound(block).instrument(span.clone()).await.inspect_err(|e| {
245-
error!(error = %e, "error handling inbound block");
246-
})?;
339+
let inbound_result = match self.handle_inbound(retries, block).instrument(span.clone()).await {
340+
Ok(control_flow) => {
341+
debug!(?control_flow, retries, "successfully handled inbound block");
342+
control_flow
343+
}
344+
Err(err) => {
345+
retries += 1;
346+
error!(error = %err, "error handling inbound block");
347+
348+
if err.to_string().contains("403") {
349+
debug!("403 error - skipping block");
350+
let (slot_number, start, end) = self.calculate_slot_window()?;
351+
debug!(slot_number, start, end, "403 sleep until skipping block");
352+
// TODO: Sleep until the end of the next slot and return retry
353+
return Ok(ControlFlow::Done);
354+
}
355+
ControlFlow::Retry
356+
}
357+
};
247358

248359
let guard = span.entered();
249360

250-
match result {
361+
match inbound_result {
251362
ControlFlow::Retry => {
252-
retries += 1;
253363
if retries > retry_limit {
254364
counter!("builder.building_too_many_retries").increment(1);
365+
debug!("retries exceeded - skipping block");
255366
return Ok(ControlFlow::Skip);
256367
}
257-
error!("error handling inbound block: retrying");
258368
drop(guard);
259-
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
260369

370+
// Detect a slot change and break out of the loop in that case too
371+
let (this_slot, start, end) = self.calculate_slot_window()?;
372+
if this_slot != current_slot {
373+
debug!("slot changed - skipping block");
374+
break inbound_result;
375+
}
376+
377+
// Otherwise retry the block
378+
debug!(retries, this_slot, start, end, "retrying block");
261379
continue;
262380
}
263381
ControlFlow::Skip => {
264382
counter!("builder.skipped_blocks").increment(1);
265-
break result;
383+
debug!(retries, "skipping block");
384+
break inbound_result;
266385
}
267386
ControlFlow::Done => {
268387
counter!("builder.submitted_successful_blocks").increment(1);
269-
break result;
388+
debug!(retries, "successfully submitted block");
389+
break inbound_result;
270390
}
271391
}
272392
};
@@ -278,16 +398,38 @@ impl SubmitTask {
278398
Ok(result)
279399
}
280400

401+
/// Calculates and returns the slot number and its start and end timestamps for the current instant.
402+
fn calculate_slot_window(&self) -> eyre::Result<(u64, u64, u64)> {
403+
let now_ts = self.now();
404+
let current_slot = self.config.slot_calculator.calculate_slot(now_ts);
405+
let (start, end) = self.config.slot_calculator.calculate_slot_window(current_slot);
406+
Ok((current_slot, start, end))
407+
}
408+
409+
/// Returns the current timestamp in seconds since the UNIX epoch.
410+
fn now(&self) -> u64 {
411+
let now = std::time::SystemTime::now();
412+
now.duration_since(UNIX_EPOCH).unwrap().as_secs()
413+
}
414+
415+
/// Task future for the submit task
281416
async fn task_future(self, mut inbound: mpsc::UnboundedReceiver<BuiltBlock>) {
282417
loop {
283418
let Some(block) = inbound.recv().await else {
284419
debug!("upstream task gone");
285420
break;
286421
};
422+
debug!(?block, "submit channel received block");
423+
424+
// TODO: Pass a BlockEnv to this function to give retrying handle inbound access to the block
425+
// env and thus the block number so that we can be sure that we try for only our assigned slots.
287426

427+
// Instead this needs to fire off a task that attempts to land the block for the given slot
428+
// Once that slot is up, it's invalid for the next anyway, so this job can be ephemeral.
288429
if self.retrying_handle_inbound(&block, 3).await.is_err() {
430+
debug!("error handling inbound block");
289431
continue;
290-
}
432+
};
291433
}
292434
}
293435

0 commit comments

Comments
 (0)