diff --git a/fuzz/src/utils/test_persister.rs b/fuzz/src/utils/test_persister.rs index 89de25aa5e6..f9a03d16178 100644 --- a/fuzz/src/utils/test_persister.rs +++ b/fuzz/src/utils/test_persister.rs @@ -17,4 +17,7 @@ impl chainmonitor::Persist for TestPersister { fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { self.update_ret.lock().unwrap().clone() } + + fn archive_persisted_channel(&self, _: OutPoint) { + } } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 015b3dacfc3..efea0cf03f7 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -194,6 +194,11 @@ pub trait Persist { /// /// [`Writeable::write`]: crate::util::ser::Writeable::write fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + /// Prevents the channel monitor from being loaded on startup. + /// + /// Archiving the data in a backup location (rather than deleting it fully) is useful for + /// hedging against data loss in case of unexpected failure. + fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint); } struct MonitorHolder { @@ -656,6 +661,41 @@ where C::Target: chain::Filter, } } } + + /// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`]. + /// + /// This is useful for pruning fully resolved monitors from the monitor set and primary + /// storage so they are not kept in memory and reloaded on restart. + /// + /// Should be called occasionally (once every handful of blocks or on startup). + /// + /// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor + /// data could be moved to an archive location or removed entirely. + pub fn archive_fully_resolved_channel_monitors(&self) { + let mut have_monitors_to_prune = false; + for (_, monitor_holder) in self.monitors.read().unwrap().iter() { + let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor); + if monitor_holder.monitor.is_fully_resolved(&logger) { + have_monitors_to_prune = true; + } + } + if have_monitors_to_prune { + let mut monitors = self.monitors.write().unwrap(); + monitors.retain(|funding_txo, monitor_holder| { + let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor); + if monitor_holder.monitor.is_fully_resolved(&logger) { + log_info!(logger, + "Archiving fully resolved ChannelMonitor for funding txo {}", + funding_txo + ); + self.persister.archive_persisted_channel(*funding_txo); + false + } else { + true + } + }); + } + } } impl diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index c2d3a43b1f4..c05b6b30922 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -935,6 +935,9 @@ pub(crate) struct ChannelMonitorImpl { /// Ordering of tuple data: (their_per_commitment_point, feerate_per_kw, to_broadcaster_sats, /// to_countersignatory_sats) initial_counterparty_commitment_info: Option<(PublicKey, u32, u64, u64)>, + + /// The first block height at which we had no remaining claimable balances. + balances_empty_height: Option, } /// Transaction outputs to watch for on-chain spends. @@ -1145,6 +1148,7 @@ impl Writeable for ChannelMonitorImpl ChannelMonitor { best_block, counterparty_node_id: Some(counterparty_node_id), initial_counterparty_commitment_info: None, + balances_empty_height: None, }) } @@ -1856,6 +1861,52 @@ impl ChannelMonitor { spendable_outputs } + /// Checks if the monitor is fully resolved. Resolved monitor is one that has claimed all of + /// its outputs and balances (i.e. [`Self::get_claimable_balances`] returns an empty set). + /// + /// This function returns true only if [`Self::get_claimable_balances`] has been empty for at least + /// 2016 blocks as an additional protection against any bugs resulting in spuriously empty balance sets. + pub fn is_fully_resolved(&self, logger: &L) -> bool { + let mut is_all_funds_claimed = self.get_claimable_balances().is_empty(); + let current_height = self.current_best_block().height; + let mut inner = self.inner.lock().unwrap(); + + if is_all_funds_claimed { + if !inner.funding_spend_seen { + debug_assert!(false, "We should see funding spend by the time a monitor clears out"); + is_all_funds_claimed = false; + } + } + + match (inner.balances_empty_height, is_all_funds_claimed) { + (Some(balances_empty_height), true) => { + // Claimed all funds, check if reached the blocks threshold. + const BLOCKS_THRESHOLD: u32 = 4032; // ~four weeks + return current_height >= balances_empty_height + BLOCKS_THRESHOLD; + }, + (Some(_), false) => { + // previously assumed we claimed all funds, but we have new funds to claim. + // Should not happen in practice. + debug_assert!(false, "Thought we were done claiming funds, but claimable_balances now has entries"); + log_error!(logger, + "WARNING: LDK thought it was done claiming all the available funds in the ChannelMonitor for channel {}, but later decided it had more to claim. This is potentially an important bug in LDK, please report it at https://github.com/lightningdevkit/rust-lightning/issues/new", + inner.get_funding_txo().0); + inner.balances_empty_height = None; + false + }, + (None, true) => { + // Claimed all funds but `balances_empty_height` is None. It is set to the + // current block height. + inner.balances_empty_height = Some(current_height); + false + }, + (None, false) => { + // Have funds to claim. + false + }, + } + } + #[cfg(test)] pub fn get_counterparty_payment_script(&self) -> ScriptBuf { self.inner.lock().unwrap().counterparty_payment_script.clone() @@ -4632,6 +4683,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut spendable_txids_confirmed = Some(Vec::new()); let mut counterparty_fulfilled_htlcs = Some(new_hash_map()); let mut initial_counterparty_commitment_info = None; + let mut balances_empty_height = None; let mut channel_id = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), @@ -4644,6 +4696,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (15, counterparty_fulfilled_htlcs, option), (17, initial_counterparty_commitment_info, option), (19, channel_id, option), + (21, balances_empty_height, option), }); // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both @@ -4722,6 +4775,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP best_block, counterparty_node_id, initial_counterparty_commitment_info, + balances_empty_height, }))) } } diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index d5f0dc153fc..1c6c4c0e994 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -158,6 +158,60 @@ fn revoked_output_htlc_resolution_timing() { expect_payment_failed!(nodes[1], payment_hash_1, false); } +#[test] +fn archive_fully_resolved_monitors() { + // Test we can archive fully resolved channel monitor. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let mut user_config = test_default_channel_config(); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(user_config), Some(user_config)]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let (_, _, chan_id, funding_tx) = + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 1_000_000); + + nodes[0].node.close_channel(&chan_id, &nodes[1].node.get_our_node_id()).unwrap(); + let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &node_0_shutdown); + let node_1_shutdown = get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &node_1_shutdown); + + let node_0_closing_signed = get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_closing_signed); + let node_1_closing_signed = get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &node_1_closing_signed); + let (_, node_0_2nd_closing_signed) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed.unwrap()); + let (_, _) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id()); + + let shutdown_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + + mine_transaction(&nodes[0], &shutdown_tx[0]); + mine_transaction(&nodes[1], &shutdown_tx[0]); + + connect_blocks(&nodes[0], 6); + connect_blocks(&nodes[1], 6); + + check_closed_event!(nodes[0], 1, ClosureReason::LocallyInitiatedCooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000); + check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyInitiatedCooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000); + + assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1); + // First archive should set balances_empty_height to current block height + nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors(); + assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1); + connect_blocks(&nodes[0], 4032); + // Second call after 4032 blocks, should archive the monitor + nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors(); + // Should have no monitors left + assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0); + // Remove the corresponding outputs and transactions the chain source is + // watching. This is to make sure the `Drop` function assertions pass. + nodes.get_mut(0).unwrap().chain_source.remove_watched_txn_and_outputs( + OutPoint { txid: funding_tx.txid(), index: 0 }, + funding_tx.output[0].script_pubkey.clone() + ); +} + fn do_chanmon_claim_value_coop_close(anchors: bool) { // Tests `get_claimable_balances` returns the correct values across a simple cooperative claim. // Specifically, this tests that the channel non-HTLC balances show up in diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 6fd0048daf7..3f918935f16 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -56,6 +56,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted. pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates"; +/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted. +pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors"; +/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted. +pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The primary namespace under which the [`NetworkGraph`] will be persisted. pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; /// The secondary namespace under which the [`NetworkGraph`] will be persisted. @@ -212,6 +217,33 @@ impl Persist chain::ChannelMonitorUpdateStatus::UnrecoverableError } } + + fn archive_persisted_channel(&self, funding_txo: OutPoint) { + let monitor_name = MonitorName::from(funding_txo); + let monitor = match self.read( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_name.as_str(), + ) { + Ok(monitor) => monitor, + Err(_) => return + }; + match self.write( + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_name.as_str(), + &monitor, + ) { + Ok(()) => {} + Err(_e) => return + }; + let _ = self.remove( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_name.as_str(), + true, + ); + } } /// Read previously persisted [`ChannelMonitor`]s from the store. @@ -718,6 +750,29 @@ where self.persist_new_channel(funding_txo, monitor, monitor_update_call_id) } } + + fn archive_persisted_channel(&self, funding_txo: OutPoint) { + let monitor_name = MonitorName::from(funding_txo); + let monitor = match self.read_monitor(&monitor_name) { + Ok((_block_hash, monitor)) => monitor, + Err(_) => return + }; + match self.kv_store.write( + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_name.as_str(), + &monitor.encode() + ) { + Ok(()) => {}, + Err(_e) => return, + }; + let _ = self.kv_store.remove( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_name.as_str(), + true, + ); + } } impl MonitorUpdatingPersister diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 305dc40a0a2..36018b23f79 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -504,6 +504,10 @@ impl chainmonitor::Persist>::archive_persisted_channel(&self.persister, funding_txo); + } } pub struct TestPersister { @@ -552,6 +556,18 @@ impl chainmonitor::Persist {}, + None => { + // If the channel was not in the offchain_monitor_updates map, it should be in the + // chain_sync_monitor_persistences map. + assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some()); + } + }; + } } pub struct TestStore { @@ -1363,6 +1379,10 @@ impl TestChainSource { watched_outputs: Mutex::new(new_hash_set()), } } + pub fn remove_watched_txn_and_outputs(&self, outpoint: OutPoint, script_pubkey: ScriptBuf) { + self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone())); + self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey)); + } } impl UtxoLookup for TestChainSource {