Skip to content

Move persist into async part of the sweeper #3819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions lightning/src/util/sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ where
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
) -> Self {
let outputs = Vec::new();
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
Self {
sweeper_state,
pending_sweep: AtomicBool::new(false),
Expand Down Expand Up @@ -446,7 +446,10 @@ where
}
self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})
})?;
state_lock.dirty = false;

Ok(())
}

/// Returns a list of the currently tracked spendable outputs.
Expand Down Expand Up @@ -503,11 +506,19 @@ where

// See if there is anything to sweep before requesting a change address.
{
let sweeper_state = self.sweeper_state.lock().unwrap();
let mut sweeper_state = self.sweeper_state.lock().unwrap();

let cur_height = sweeper_state.best_block.height;
let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
if !has_respends {
// If there is nothing to sweep, we still persist the state if it is dirty.
if sweeper_state.dirty {
self.persist_state(&sweeper_state).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})?;
sweeper_state.dirty = false;
}

return Ok(());
}
}
Expand All @@ -531,7 +542,8 @@ where
.collect();

if respend_descriptors.is_empty() {
// It could be that a tx confirmed and there is now nothing to sweep anymore.
// It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state,
// we'll persist it in the next cycle.
return Ok(());
}

Expand Down Expand Up @@ -563,6 +575,7 @@ where
self.persist_state(&sweeper_state).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})?;
sweeper_state.dirty = false;

self.broadcaster.broadcast_transactions(&[&spending_tx]);
}
Expand All @@ -588,6 +601,8 @@ where
}
true
});

sweeper_state.dirty = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is redundant, but probably also can't hurt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving as is, to at least flag dirty in the functions that change state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not doing the RAII pattern. There are just a few places where dirty needs to be cleared, and seems fine for now.

}

fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
Expand Down Expand Up @@ -641,13 +656,17 @@ where
}
}
}

sweeper_state.dirty = true;
}

fn best_block_updated_internal(
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
) {
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
self.prune_confirmed_outputs(sweeper_state);

sweeper_state.dirty = true;
}
}

Expand All @@ -671,12 +690,8 @@ where
assert_eq!(state_lock.best_block.height, height - 1,
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");

self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
self.best_block_updated_internal(&mut *state_lock, header, height);

let _ = self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
self.best_block_updated_internal(&mut state_lock, header, height);
}

fn block_disconnected(&self, header: &Header, height: u32) {
Expand All @@ -698,9 +713,7 @@ where
}
}

self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}
}

Expand All @@ -720,9 +733,6 @@ where
) {
let mut state_lock = self.sweeper_state.lock().unwrap();
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
}

fn transaction_unconfirmed(&self, txid: &Txid) {
Expand All @@ -743,18 +753,13 @@ where
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
.for_each(|o| o.status.unconfirmed());

self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}
}

fn best_block_updated(&self, header: &Header, height: u32) {
let mut state_lock = self.sweeper_state.lock().unwrap();
self.best_block_updated_internal(&mut *state_lock, header, height);
let _ = self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
self.best_block_updated_internal(&mut state_lock, header, height);
}

fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
Expand Down Expand Up @@ -783,11 +788,13 @@ where
struct SweeperState {
outputs: Vec<TrackedSpendableOutput>,
best_block: BestBlock,
dirty: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had that discussion before: I'd really prefer it if we don't mix in runtime state with the SweeperState, which is precisely the object we use to isolated the persisted state from the non-persisted state, which also avoid having to hand a mutable state to persist_state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last time, I created that isolation, but then reverted in favor of an atomic boolean. Which direction would you suggest taking it with the dirty flag? I don't think I'd like another atomic boolean. Already didn't like the first one, but two independent sync primitives is expanding the state space even further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would much prefer to just have another needs_persist: AtomicBool on OutputSweeper directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of flows that I'd like to avoid is stuff like: update state, unlock state, mark dirty and then concurrently a persist is happening in between unlock and mark dirty, ultimately leading to clean state marked as dirty that will be re-persisted without changes. Ofc the re-persist isn't the biggest problem, but I am cautious of requiring devs to reason through scenarios like the one above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try out here: main...joostjager:rust-lightning:sweeper-async-persist-atomicbool

I definitely feel all those cases pop up within me if I use that atomic bool.

Copy link
Contributor

@tnull tnull Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there a two aspects here why I dislike the idea of pushing the runtime flags into SweeperState:

  1. We intentionally had SweeperState hold the 'actual'/persisted state of the sweeper, not any runtime-specific behavior. The (_unused, dirty, (static_value, false)), in the persistence logic really just shows that you unnecessarily broke the separation of data and logic we had here. If we think that all should be locked under a single Mutex, we'd need to create a wrapper struct holding both the SweeperState and the runtime-specific bool to maintain that.
  2. However, secondly, I don't think we should introduce the lock contention and block the background processor that is woken and processing a 'I need persist' notification just to check if it actually still needs to re-persist. We don't have strong guarantees when the BP responds to a notification, so if it's mid-loop already it might take a while until it gets back to actually process the persist. Also note that what we do in this PR is effectively splitting the persistence in two: sync in-line persistence for stuff that really needs to happen before we return (track_spendable_outputs) and 'lazy'/async persistence that will happen some time after block connection. For the latter we have relaxed consistency guarantees anyways, and we basically increase chances of missing a persistence anyways. So I don't quite understand where the concern for race conditions in this 'lazy' case comes from. I don't see why we favor lock contention over (theoretical) relaxed consistency guarantees for a case where we already opt into the latter knowingly.

It might also be noteworthy that post-async KVStore we might need to rework the current pattern anyways, as we wouldn't be able to hold the MutexGuard across the write().await boundary. We'll figure that out when we get there, but it could mean that we need to clone the to-be persisted state before dropping the lock, and actually making the call, which would be another reason to not include ~unrelated fields in the state object.

TLDR: I'd prefer to continue to have the runtime bools live as AtomicBools on OutputSweeper directly, but if you guys really worry about any races for the already-lazy case, we should at the very least solve it by wrapping the two fields and SweeperState in yet another struct, maintaining the data/logic separation.

Copy link
Contributor Author

@joostjager joostjager Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more exploration of the atomic bool direction, I couldn't get rid of the suggested or real race conditions. I kept the dirty flag as part of the state, and separated it from the persistent fields. Let me know what you think.

Regarding that separation, I do want to point to #3618 (comment). Opinions and also current practices vary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnull re: (1) can you elaborate on why you see the (_unused, dirty, (static_value, false)) as bad? Not obvious to me why this is different from the other places in the codebase where we do this but might be missing something.

Re: (2) I'm not sure I'm following because even if we use an atomic bool we'll still take the sweeper lock at least once in regenerate_and_broadcast_spend_if_necessary -- this additional instance doesn't seem unique? Not saying we'll definitely have races with the atomic bool, just that readers have to think through whether we'll miss a persist or have an extra persist unless everything is changed under one lock, so I want to make sure I understand why it's worth it to not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more exploration of the atomic bool direction, I couldn't get rid of the suggested or real race conditions. I kept the dirty flag as part of the state, and separated it from the persistent fields. Let me know what you think.

Yeah, as mentioned above and discussed offline, we can go this direction if you guys prefer it, as long as we maintain the separation of concerns, and drop the mutable reference from persist_state.

@tnull re: (1) can you elaborate on why you see the (_unused, dirty, (static_value, false)) as bad? Not obvious to me why this is different from the other places in the codebase where we do this but might be missing something.

The separation of data and logic is a nice principle to uphold, as it often makes reasoning about the code and later refactorings easier. I agree that we rarely follow that principle, but the sweeper is one of the few places where we did, and consciously chose to do so during the original review discussion. Would be great to maintain that.

Re: (2) I'm not sure I'm following because even if we use an atomic bool we'll still take the sweeper lock at least once in regenerate_and_broadcast_spend_if_necessary -- this additional instance doesn't seem unique?

Right, but block connections might be driven by another concurrent task, which might lead to unnecessarily re-acrquiring the mutex guard, just to check if we still have to persist once the BP loop gets around to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sweeper is one of the few places where we did

I don't think this is the case, because before #3734 (where state separation was also a topic of discussion), there was no runtime-only state in the sweeper?

In the last push on this PR, I've added the state separation but not the atomic boolean.

}

impl_writeable_tlv_based!(SweeperState, {
(0, outputs, required_vec),
(2, best_block, required),
(_unused, dirty, (static_value, false)),
});

/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
Expand Down
Loading