Skip to content

Commit 7cb98ad

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent afe4285 commit 7cb98ad

File tree

1 file changed

+30
-23
lines changed

1 file changed

+30
-23
lines changed

lightning/src/util/sweep.rs

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
385+
let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
386386
Self {
387387
sweeper_state,
388388
pending_sweep: AtomicBool::new(false),
@@ -446,7 +446,10 @@ where
446446
}
447447
self.persist_state(&*state_lock).map_err(|e| {
448448
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
449-
})
449+
})?;
450+
state_lock.dirty = false;
451+
452+
Ok(())
450453
}
451454

452455
/// Returns a list of the currently tracked spendable outputs.
@@ -503,11 +506,19 @@ where
503506

504507
// See if there is anything to sweep before requesting a change address.
505508
{
506-
let sweeper_state = self.sweeper_state.lock().unwrap();
509+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
507510

508511
let cur_height = sweeper_state.best_block.height;
509512
let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
510513
if !has_respends {
514+
// If there is nothing to sweep, we still persist the state if it is dirty.
515+
if sweeper_state.dirty {
516+
self.persist_state(&sweeper_state).map_err(|e| {
517+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
518+
})?;
519+
sweeper_state.dirty = false;
520+
}
521+
511522
return Ok(());
512523
}
513524
}
@@ -531,7 +542,8 @@ where
531542
.collect();
532543

533544
if respend_descriptors.is_empty() {
534-
// It could be that a tx confirmed and there is now nothing to sweep anymore.
545+
// It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state,
546+
// we'll persist it in the next cycle.
535547
return Ok(());
536548
}
537549

@@ -563,6 +575,7 @@ where
563575
self.persist_state(&sweeper_state).map_err(|e| {
564576
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
565577
})?;
578+
sweeper_state.dirty = false;
566579

567580
self.broadcaster.broadcast_transactions(&[&spending_tx]);
568581
}
@@ -588,6 +601,8 @@ where
588601
}
589602
true
590603
});
604+
605+
sweeper_state.dirty = true;
591606
}
592607

593608
fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
@@ -641,13 +656,17 @@ where
641656
}
642657
}
643658
}
659+
660+
sweeper_state.dirty = true;
644661
}
645662

646663
fn best_block_updated_internal(
647664
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
648665
) {
649666
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
650667
self.prune_confirmed_outputs(sweeper_state);
668+
669+
sweeper_state.dirty = true;
651670
}
652671
}
653672

@@ -671,12 +690,8 @@ where
671690
assert_eq!(state_lock.best_block.height, height - 1,
672691
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
673692

674-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
675-
self.best_block_updated_internal(&mut *state_lock, header, height);
676-
677-
let _ = self.persist_state(&*state_lock).map_err(|e| {
678-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
679-
});
693+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
694+
self.best_block_updated_internal(&mut state_lock, header, height);
680695
}
681696

682697
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -698,9 +713,7 @@ where
698713
}
699714
}
700715

701-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
702-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
703-
});
716+
state_lock.dirty = true;
704717
}
705718
}
706719

@@ -720,9 +733,6 @@ where
720733
) {
721734
let mut state_lock = self.sweeper_state.lock().unwrap();
722735
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
723-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
724-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
725-
});
726736
}
727737

728738
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -743,18 +753,13 @@ where
743753
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
744754
.for_each(|o| o.status.unconfirmed());
745755

746-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
747-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
748-
});
756+
state_lock.dirty = true;
749757
}
750758
}
751759

752760
fn best_block_updated(&self, header: &Header, height: u32) {
753761
let mut state_lock = self.sweeper_state.lock().unwrap();
754-
self.best_block_updated_internal(&mut *state_lock, header, height);
755-
let _ = self.persist_state(&*state_lock).map_err(|e| {
756-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
757-
});
762+
self.best_block_updated_internal(&mut state_lock, header, height);
758763
}
759764

760765
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -783,11 +788,13 @@ where
783788
struct SweeperState {
784789
outputs: Vec<TrackedSpendableOutput>,
785790
best_block: BestBlock,
791+
dirty: bool,
786792
}
787793

788794
impl_writeable_tlv_based!(SweeperState, {
789795
(0, outputs, required_vec),
790796
(2, best_block, required),
797+
(_unused, dirty, (static_value, false)),
791798
});
792799

793800
/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a

0 commit comments

Comments
 (0)