@@ -111,6 +111,7 @@ impl SubmitTask {
111
111
async fn next_host_block_height ( & self ) -> eyre:: Result < u64 > {
112
112
let result = self . provider ( ) . get_block_number ( ) . await ?;
113
113
let next = result. checked_add ( 1 ) . ok_or_else ( || eyre ! ( "next host block height overflow" ) ) ?;
114
+ debug ! ( next, "next host block height" ) ;
114
115
Ok ( next)
115
116
}
116
117
@@ -146,9 +147,7 @@ impl SubmitTask {
146
147
error = ?maybe_error,
147
148
"error in transaction simulation"
148
149
) ;
149
- if let Err ( e) = maybe_error {
150
- return Err ( e) ;
151
- }
150
+ maybe_error?;
152
151
}
153
152
154
153
Ok ( tx)
@@ -257,7 +256,6 @@ impl SubmitTask {
257
256
nonce = ?tx. nonce,
258
257
"sending transaction to network"
259
258
) ;
260
-
261
259
262
260
// assign the nonce and fill the rest of the values
263
261
let SendableTx :: Envelope ( tx) = self . provider ( ) . fill ( tx) . await ? else {
@@ -285,6 +283,9 @@ impl SubmitTask {
285
283
return Ok ( ControlFlow :: Skip ) ;
286
284
}
287
285
286
+ // Okay so the code gets all the way to this log
287
+ // but we don't see the tx hash in the logs or in the explorer,
288
+ // not even as a failed TX, just not at all.
288
289
info ! (
289
290
tx_hash = %tx. tx_hash( ) ,
290
291
ru_chain_id = %resp. req. ru_chain_id,
@@ -336,22 +337,31 @@ impl SubmitTask {
336
337
let span = debug_span ! ( "SubmitTask::retrying_handle_inbound" , retries) ;
337
338
debug ! ( retries, "number of retries" ) ;
338
339
339
- let inbound_result = match self . handle_inbound ( retries, block) . instrument ( span. clone ( ) ) . await {
340
+ let inbound_result = match self
341
+ . handle_inbound ( retries, block)
342
+ . instrument ( span. clone ( ) )
343
+ . await
344
+ {
340
345
Ok ( control_flow) => {
341
346
debug ! ( ?control_flow, retries, "successfully handled inbound block" ) ;
342
347
control_flow
343
348
}
344
349
Err ( err) => {
350
+ // Log the retry attempt
345
351
retries += 1 ;
346
352
error ! ( error = %err, "error handling inbound block" ) ;
347
353
354
+ // Delay until next slot if we get a 403 error
348
355
if err. to_string ( ) . contains ( "403" ) {
349
- debug ! ( "403 error - skipping block" ) ;
350
- let ( slot_number, start, end) = self . calculate_slot_window ( ) ?;
351
- debug ! ( slot_number, start, end, "403 sleep until skipping block" ) ;
352
- // TODO: Sleep until the end of the next slot and return retry
353
- return Ok ( ControlFlow :: Done ) ;
356
+ let ( slot_number, _, end) = self . calculate_slot_window ( ) ?;
357
+ let now = self . now ( ) ;
358
+ if end > now {
359
+ let sleep_duration = std:: time:: Duration :: from_secs ( end - now) ;
360
+ debug ! ( sleep_duration = ?sleep_duration, slot_number, "403 detected - sleeping until end of slot" ) ;
361
+ tokio:: time:: sleep ( sleep_duration) . await ;
362
+ }
354
363
}
364
+
355
365
ControlFlow :: Retry
356
366
}
357
367
} ;
@@ -413,19 +423,36 @@ impl SubmitTask {
413
423
}
414
424
415
425
/// Task future for the submit task
426
+ /// NB: This task assumes that the simulator will only send it blocks for
427
+ /// slots that it's assigned.
416
428
async fn task_future ( self , mut inbound : mpsc:: UnboundedReceiver < BuiltBlock > ) {
429
+ // Holds a reference to the last block we attempted to submit
430
+ let mut last_block_attempted: u64 = 0 ;
431
+
417
432
loop {
433
+ // Wait to receive a new block
418
434
let Some ( block) = inbound. recv ( ) . await else {
419
435
debug ! ( "upstream task gone" ) ;
420
436
break ;
421
437
} ;
422
- debug ! ( ?block, "submit channel received block" ) ;
438
+ debug ! ( block_number = block. block_number( ) , ?block, "submit channel received block" ) ;
439
+
440
+ // Check if a block number was set and skip if not
441
+ if block. block_number ( ) == 0 {
442
+ debug ! ( "block number is 0 - skipping" ) ;
443
+ continue ;
444
+ }
445
+
446
+ // Only attempt each block number once
447
+ if block. block_number ( ) == last_block_attempted {
448
+ debug ! ( "block number is unchanged from last attempt - skipping" ) ;
449
+ continue ;
450
+ }
423
451
424
- // TODO: Pass a BlockEnv to this function to give retrying handle inbound access to the block
425
- // env and thus the block number so that we can be sure that we try for only our assigned slots.
452
+ // This means we have encountered a new block, so reset the last block attempted
453
+ last_block_attempted = block. block_number ( ) ;
454
+ debug ! ( last_block_attempted, "resetting last block attempted" ) ;
426
455
427
- // Instead this needs to fire off a task that attempts to land the block for the given slot
428
- // Once that slot is up, it's invalid for the next anyway, so this job can be ephemeral.
429
456
if self . retrying_handle_inbound ( & block, 3 ) . await . is_err ( ) {
430
457
debug ! ( "error handling inbound block" ) ;
431
458
continue ;
0 commit comments