Skip to content

Add archive_fully_resolved_monitors to ChainMonitor #2964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fuzz/src/utils/test_persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
self.update_ret.lock().unwrap().clone()
}

fn archive_persisted_channel(&self, _: OutPoint) {
}
}
40 changes: 40 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
/// Prevents the channel monitor from being loaded on startup.
///
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
/// hedging against data loss in case of unexpected failure.
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at multiple places i see error handling related to "got an error -> ignore"
should we consider returning a Result<(), io::Error> from here and ignore error where archive_persisted_channel is being used?

it could also be helpful for users implementing their own Persist trait, if we ignore it on our own.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why LDK swallowing a user-provided error is better than it being visible to the user that LDK will do nothing with their error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, users will be questioning "what to do with error", while implementing this interface.
it would have been nice in the sense that we take the decision for them or handle the error if we can.

imo, a user facing interface should only be non-fallible if there is no way we can handle it.

anyway, not holding this PR for this, it is ok if we leave it like this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, a user facing interface should only be non-fallible if there is no way we can handle it.

I think I take a bit of a different view - in this case we're not "handling it", but rather we're ignoring it - adding an error type would be confusing as it would imply to the user that we'll do something with the error and that they need to correctly propagate errors, whereas that's not actually true - the error will be ignored and if they have an error they should consider whether it is fatal for themselves.

}

struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
Expand Down Expand Up @@ -656,6 +661,41 @@ where C::Target: chain::Filter,
}
}
}

/// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`].
///
/// This is useful for pruning fully resolved monitors from the monitor set and primary
/// storage so they are not kept in memory and reloaded on restart.
///
/// Should be called occasionally (once every handful of blocks or on startup).
///
/// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor
/// data could be moved to an archive location or removed entirely.
pub fn archive_fully_resolved_channel_monitors(&self) {
let mut have_monitors_to_prune = false;
for (_, monitor_holder) in self.monitors.read().unwrap().iter() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
if monitor_holder.monitor.is_fully_resolved(&logger) {
have_monitors_to_prune = true;
}
}
if have_monitors_to_prune {
let mut monitors = self.monitors.write().unwrap();
monitors.retain(|funding_txo, monitor_holder| {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
if monitor_holder.monitor.is_fully_resolved(&logger) {
log_info!(logger,
"Archiving fully resolved ChannelMonitor for funding txo {}",
funding_txo
);
self.persister.archive_persisted_channel(*funding_txo);
false
} else {
true
}
});
}
}
}

impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand Down
54 changes: 54 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,9 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
/// Ordering of tuple data: (their_per_commitment_point, feerate_per_kw, to_broadcaster_sats,
/// to_countersignatory_sats)
initial_counterparty_commitment_info: Option<(PublicKey, u32, u64, u64)>,

/// The first block height at which we had no remaining claimable balances.
balances_empty_height: Option<u32>,
}

/// Transaction outputs to watch for on-chain spends.
Expand Down Expand Up @@ -1145,6 +1148,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
(15, self.counterparty_fulfilled_htlcs, required),
(17, self.initial_counterparty_commitment_info, option),
(19, self.channel_id, required),
(21, self.balances_empty_height, option),
});

Ok(())
Expand Down Expand Up @@ -1328,6 +1332,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
best_block,
counterparty_node_id: Some(counterparty_node_id),
initial_counterparty_commitment_info: None,
balances_empty_height: None,
})
}

Expand Down Expand Up @@ -1856,6 +1861,52 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
spendable_outputs
}

/// Checks if the monitor is fully resolved. Resolved monitor is one that has claimed all of
/// its outputs and balances (i.e. [`Self::get_claimable_balances`] returns an empty set).
///
/// This function returns true only if [`Self::get_claimable_balances`] has been empty for at least
/// 2016 blocks as an additional protection against any bugs resulting in spuriously empty balance sets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doc update to 4032 blocks

pub fn is_fully_resolved<L: Logger>(&self, logger: &L) -> bool {
let mut is_all_funds_claimed = self.get_claimable_balances().is_empty();
let current_height = self.current_best_block().height;
let mut inner = self.inner.lock().unwrap();

if is_all_funds_claimed {
if !inner.funding_spend_seen {
debug_assert!(false, "We should see funding spend by the time a monitor clears out");
is_all_funds_claimed = false;
}
}

match (inner.balances_empty_height, is_all_funds_claimed) {
(Some(balances_empty_height), true) => {
// Claimed all funds, check if reached the blocks threshold.
const BLOCKS_THRESHOLD: u32 = 4032; // ~four weeks
return current_height >= balances_empty_height + BLOCKS_THRESHOLD;
},
(Some(_), false) => {
// previously assumed we claimed all funds, but we have new funds to claim.
// Should not happen in practice.
debug_assert!(false, "Thought we were done claiming funds, but claimable_balances now has entries");
log_error!(logger,
"WARNING: LDK thought it was done claiming all the available funds in the ChannelMonitor for channel {}, but later decided it had more to claim. This is potentially an important bug in LDK, please report it at https://github.com/lightningdevkit/rust-lightning/issues/new",
inner.get_funding_txo().0);
inner.balances_empty_height = None;
false
},
(None, true) => {
// Claimed all funds but `balances_empty_height` is None. It is set to the
// current block height.
inner.balances_empty_height = Some(current_height);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also log here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a log "Will archive fully resolved ChannelMonitor for funding txo {}, after 4032 blocks"
idea being it is well before the actual archive and can serve as warning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this one somehow.

false
},
(None, false) => {
// Have funds to claim.
false
},
}
}

#[cfg(test)]
pub fn get_counterparty_payment_script(&self) -> ScriptBuf {
self.inner.lock().unwrap().counterparty_payment_script.clone()
Expand Down Expand Up @@ -4632,6 +4683,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
let mut spendable_txids_confirmed = Some(Vec::new());
let mut counterparty_fulfilled_htlcs = Some(new_hash_map());
let mut initial_counterparty_commitment_info = None;
let mut balances_empty_height = None;
let mut channel_id = None;
read_tlv_fields!(reader, {
(1, funding_spend_confirmed, option),
Expand All @@ -4644,6 +4696,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
(15, counterparty_fulfilled_htlcs, option),
(17, initial_counterparty_commitment_info, option),
(19, channel_id, option),
(21, balances_empty_height, option),
});

// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both
Expand Down Expand Up @@ -4722,6 +4775,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
best_block,
counterparty_node_id,
initial_counterparty_commitment_info,
balances_empty_height,
})))
}
}
Expand Down
54 changes: 54 additions & 0 deletions lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,60 @@ fn revoked_output_htlc_resolution_timing() {
expect_payment_failed!(nodes[1], payment_hash_1, false);
}

#[test]
fn archive_fully_resolved_monitors() {
// Test we can archive fully resolved channel monitor.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let mut user_config = test_default_channel_config();
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(user_config), Some(user_config)]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);

let (_, _, chan_id, funding_tx) =
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 1_000_000);

nodes[0].node.close_channel(&chan_id, &nodes[1].node.get_our_node_id()).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test-case starts with a closed channel, is it worth it to attempt archiving during other states, and ensure we don't archive?
(could be helpful in case of catching a regression)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a handful of other tests in this file that at least test that get_claimable_balance is correct in all the states we could enumerate, so it should at least be safe there, we could add more assertions that the monitor isn't prunable in those tests later.

let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &node_0_shutdown);
let node_1_shutdown = get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id());
nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &node_1_shutdown);

let node_0_closing_signed = get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id());
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_closing_signed);
let node_1_closing_signed = get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id());
nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &node_1_closing_signed);
let (_, node_0_2nd_closing_signed) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed.unwrap());
let (_, _) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());

let shutdown_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);

mine_transaction(&nodes[0], &shutdown_tx[0]);
mine_transaction(&nodes[1], &shutdown_tx[0]);

connect_blocks(&nodes[0], 6);
connect_blocks(&nodes[1], 6);

check_closed_event!(nodes[0], 1, ClosureReason::LocallyInitiatedCooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000);
check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyInitiatedCooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000);

assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
// First archive should set balances_empty_height to current block height
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
connect_blocks(&nodes[0], 4032);
// Second call after 4032 blocks, should archive the monitor
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
// Should have no monitors left
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0);
// Remove the corresponding outputs and transactions the chain source is
// watching. This is to make sure the `Drop` function assertions pass.
nodes.get_mut(0).unwrap().chain_source.remove_watched_txn_and_outputs(
OutPoint { txid: funding_tx.txid(), index: 0 },
funding_tx.output[0].script_pubkey.clone()
);
}

fn do_chanmon_claim_value_coop_close(anchors: bool) {
// Tests `get_claimable_balances` returns the correct values across a simple cooperative claim.
// Specifically, this tests that the channel non-HTLC balances show up in
Expand Down
55 changes: 55 additions & 0 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";

/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// The primary namespace under which the [`NetworkGraph`] will be persisted.
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
Expand Down Expand Up @@ -212,6 +217,33 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
) {
Ok(monitor) => monitor,
Err(_) => return
};
match self.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor,
) {
Ok(()) => {}
Err(_e) => return
};
let _ = self.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
true,
);
}
}

/// Read previously persisted [`ChannelMonitor`]s from the store.
Expand Down Expand Up @@ -718,6 +750,29 @@ where
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read_monitor(&monitor_name) {
Ok((_block_hash, monitor)) => monitor,
Err(_) => return
};
match self.kv_store.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor.encode()
) {
Ok(()) => {},
Err(_e) => return,
};
let _ = self.kv_store.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
true,
);
}
}

impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
Expand Down
20 changes: 20 additions & 0 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
res
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
<TestPersister as chainmonitor::Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);
}
}

pub struct TestPersister {
Expand Down Expand Up @@ -552,6 +556,18 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
ret
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
// remove the channel from the offchain_monitor_updates map
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
Some(_) => {},
None => {
// If the channel was not in the offchain_monitor_updates map, it should be in the
// chain_sync_monitor_persistences map.
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
}
};
}
}

pub struct TestStore {
Expand Down Expand Up @@ -1363,6 +1379,10 @@ impl TestChainSource {
watched_outputs: Mutex::new(new_hash_set()),
}
}
pub fn remove_watched_txn_and_outputs(&self, outpoint: OutPoint, script_pubkey: ScriptBuf) {
self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone()));
self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey));
}
}

impl UtxoLookup for TestChainSource {
Expand Down