diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index a664c7c794e..cd18ffad12d 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -37,7 +37,7 @@ use crate::ln::{PaymentHash, PaymentPreimage}; use crate::ln::msgs::DecodeError; use crate::ln::chan_utils; use crate::ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCClaim, ChannelTransactionParameters, HolderCommitmentTransaction}; -use crate::ln::channelmanager::HTLCSource; +use crate::ln::channelmanager::{HTLCSource, SentHTLCId}; use crate::chain; use crate::chain::{BestBlock, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator, LowerBoundedFeeEstimator}; @@ -494,6 +494,7 @@ pub(crate) enum ChannelMonitorUpdateStep { LatestHolderCommitmentTXInfo { commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>, + claimed_htlcs: Vec<(SentHTLCId, PaymentPreimage)>, }, LatestCounterpartyCommitmentTXInfo { commitment_txid: Txid, @@ -536,6 +537,7 @@ impl ChannelMonitorUpdateStep { impl_writeable_tlv_based_enum_upgradable!(ChannelMonitorUpdateStep, (0, LatestHolderCommitmentTXInfo) => { (0, commitment_tx, required), + (1, claimed_htlcs, vec_type), (2, htlc_outputs, vec_type), }, (1, LatestCounterpartyCommitmentTXInfo) => { @@ -750,6 +752,8 @@ pub(crate) struct ChannelMonitorImpl { /// Serialized to disk but should generally not be sent to Watchtowers. counterparty_hash_commitment_number: HashMap, + counterparty_fulfilled_htlcs: HashMap, + // We store two holder commitment transactions to avoid any race conditions where we may update // some monitors (potentially on watchtowers) but then fail to update others, resulting in the // various monitors for one channel being out of sync, and us broadcasting a holder @@ -1033,6 +1037,7 @@ impl Writeable for ChannelMonitorImpl ChannelMonitor { counterparty_claimable_outpoints: HashMap::new(), counterparty_commitment_txn_on_chain: HashMap::new(), counterparty_hash_commitment_number: HashMap::new(), + counterparty_fulfilled_htlcs: HashMap::new(), prev_holder_signed_commitment_tx: None, current_holder_commitment_tx: holder_commitment_tx, @@ -1174,7 +1180,7 @@ impl ChannelMonitor { &self, holder_commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>, ) -> Result<(), ()> { - self.inner.lock().unwrap().provide_latest_holder_commitment_tx(holder_commitment_tx, htlc_outputs).map_err(|_| ()) + self.inner.lock().unwrap().provide_latest_holder_commitment_tx(holder_commitment_tx, htlc_outputs, &Vec::new()).map_err(|_| ()) } /// This is used to provide payment preimage(s) out-of-band during startup without updating the @@ -1810,9 +1816,10 @@ impl ChannelMonitor { /// `ChannelMonitor`. This is used to determine if an HTLC was removed from the channel prior /// to the `ChannelManager` having been persisted. /// - /// This is similar to [`Self::get_pending_outbound_htlcs`] except it includes HTLCs which were - /// resolved by this `ChannelMonitor`. - pub(crate) fn get_all_current_outbound_htlcs(&self) -> HashMap { + /// This is similar to [`Self::get_pending_or_resolved_outbound_htlcs`] except it includes + /// HTLCs which were resolved on-chain (i.e. where the final HTLC resolution was done by an + /// event from this `ChannelMonitor`). + pub(crate) fn get_all_current_outbound_htlcs(&self) -> HashMap)> { let mut res = HashMap::new(); // Just examine the available counterparty commitment transactions. See docs on // `fail_unbroadcast_htlcs`, below, for justification. @@ -1822,7 +1829,8 @@ impl ChannelMonitor { if let Some(ref latest_outpoints) = us.counterparty_claimable_outpoints.get($txid) { for &(ref htlc, ref source_option) in latest_outpoints.iter() { if let &Some(ref source) = source_option { - res.insert((**source).clone(), htlc.clone()); + res.insert((**source).clone(), (htlc.clone(), + us.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).cloned())); } } } @@ -1837,9 +1845,14 @@ impl ChannelMonitor { res } - /// Gets the set of outbound HTLCs which are pending resolution in this channel. + /// Gets the set of outbound HTLCs which are pending resolution in this channel or which were + /// resolved with a preimage from our counterparty. + /// /// This is used to reconstruct pending outbound payments on restart in the ChannelManager. - pub(crate) fn get_pending_outbound_htlcs(&self) -> HashMap { + /// + /// Currently, the preimage is unused, however if it is present in the relevant internal state + /// an HTLC is always included even if it has been resolved. + pub(crate) fn get_pending_or_resolved_outbound_htlcs(&self) -> HashMap)> { let us = self.inner.lock().unwrap(); // We're only concerned with the confirmation count of HTLC transactions, and don't // actually care how many confirmations a commitment transaction may or may not have. Thus, @@ -1887,8 +1900,10 @@ impl ChannelMonitor { Some(commitment_tx_output_idx) == htlc.transaction_output_index } else { false } }); - if !htlc_update_confd { - res.insert(source.clone(), htlc.clone()); + let counterparty_resolved_preimage_opt = + us.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).cloned(); + if !htlc_update_confd || counterparty_resolved_preimage_opt.is_some() { + res.insert(source.clone(), (htlc.clone(), counterparty_resolved_preimage_opt)); } } } @@ -1970,6 +1985,9 @@ macro_rules! fail_unbroadcast_htlcs { } } if matched_htlc { continue; } + if $self.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).is_some() { + continue; + } $self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { if entry.height != $commitment_tx_conf_height { return true; } match entry.event { @@ -2041,8 +2059,23 @@ impl ChannelMonitorImpl { // Prune HTLCs from the previous counterparty commitment tx so we don't generate failure/fulfill // events for now-revoked/fulfilled HTLCs. if let Some(txid) = self.prev_counterparty_commitment_txid.take() { - for &mut (_, ref mut source) in self.counterparty_claimable_outpoints.get_mut(&txid).unwrap() { - *source = None; + if self.current_counterparty_commitment_txid.unwrap() != txid { + let cur_claimables = self.counterparty_claimable_outpoints.get( + &self.current_counterparty_commitment_txid.unwrap()).unwrap(); + for (_, ref source_opt) in self.counterparty_claimable_outpoints.get(&txid).unwrap() { + if let Some(source) = source_opt { + if !cur_claimables.iter() + .any(|(_, cur_source_opt)| cur_source_opt == source_opt) + { + self.counterparty_fulfilled_htlcs.remove(&SentHTLCId::from_source(source)); + } + } + } + for &mut (_, ref mut source_opt) in self.counterparty_claimable_outpoints.get_mut(&txid).unwrap() { + *source_opt = None; + } + } else { + assert!(cfg!(fuzzing), "Commitment txids are unique outside of fuzzing, where hashes can collide"); } } @@ -2127,28 +2160,37 @@ impl ChannelMonitorImpl { /// is important that any clones of this channel monitor (including remote clones) by kept /// up-to-date as our holder commitment transaction is updated. /// Panics if set_on_holder_tx_csv has never been called. - fn provide_latest_holder_commitment_tx(&mut self, holder_commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>) -> Result<(), &'static str> { - // block for Rust 1.34 compat - let mut new_holder_commitment_tx = { - let trusted_tx = holder_commitment_tx.trust(); - let txid = trusted_tx.txid(); - let tx_keys = trusted_tx.keys(); - self.current_holder_commitment_number = trusted_tx.commitment_number(); - HolderSignedTx { - txid, - revocation_key: tx_keys.revocation_key, - a_htlc_key: tx_keys.broadcaster_htlc_key, - b_htlc_key: tx_keys.countersignatory_htlc_key, - delayed_payment_key: tx_keys.broadcaster_delayed_payment_key, - per_commitment_point: tx_keys.per_commitment_point, - htlc_outputs, - to_self_value_sat: holder_commitment_tx.to_broadcaster_value_sat(), - feerate_per_kw: trusted_tx.feerate_per_kw(), - } + fn provide_latest_holder_commitment_tx(&mut self, holder_commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>, claimed_htlcs: &[(SentHTLCId, PaymentPreimage)]) -> Result<(), &'static str> { + let trusted_tx = holder_commitment_tx.trust(); + let txid = trusted_tx.txid(); + let tx_keys = trusted_tx.keys(); + self.current_holder_commitment_number = trusted_tx.commitment_number(); + let mut new_holder_commitment_tx = HolderSignedTx { + txid, + revocation_key: tx_keys.revocation_key, + a_htlc_key: tx_keys.broadcaster_htlc_key, + b_htlc_key: tx_keys.countersignatory_htlc_key, + delayed_payment_key: tx_keys.broadcaster_delayed_payment_key, + per_commitment_point: tx_keys.per_commitment_point, + htlc_outputs, + to_self_value_sat: holder_commitment_tx.to_broadcaster_value_sat(), + feerate_per_kw: trusted_tx.feerate_per_kw(), }; self.onchain_tx_handler.provide_latest_holder_tx(holder_commitment_tx); mem::swap(&mut new_holder_commitment_tx, &mut self.current_holder_commitment_tx); self.prev_holder_signed_commitment_tx = Some(new_holder_commitment_tx); + for (claimed_htlc_id, claimed_preimage) in claimed_htlcs { + #[cfg(debug_assertions)] { + let cur_counterparty_htlcs = self.counterparty_claimable_outpoints.get( + &self.current_counterparty_commitment_txid.unwrap()).unwrap(); + assert!(cur_counterparty_htlcs.iter().any(|(_, source_opt)| { + if let Some(source) = source_opt { + SentHTLCId::from_source(source) == *claimed_htlc_id + } else { false } + })); + } + self.counterparty_fulfilled_htlcs.insert(*claimed_htlc_id, *claimed_preimage); + } if self.holder_tx_signed { return Err("Latest holder commitment signed has already been signed, update is rejected"); } @@ -2243,10 +2285,10 @@ impl ChannelMonitorImpl { let bounded_fee_estimator = LowerBoundedFeeEstimator::new(&*fee_estimator); for update in updates.updates.iter() { match update { - ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { commitment_tx, htlc_outputs } => { + ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { commitment_tx, htlc_outputs, claimed_htlcs } => { log_trace!(logger, "Updating ChannelMonitor with latest holder commitment transaction info"); if self.lockdown_from_offchain { panic!(); } - if let Err(e) = self.provide_latest_holder_commitment_tx(commitment_tx.clone(), htlc_outputs.clone()) { + if let Err(e) = self.provide_latest_holder_commitment_tx(commitment_tx.clone(), htlc_outputs.clone(), &claimed_htlcs) { log_error!(logger, "Providing latest holder commitment transaction failed/was refused:"); log_error!(logger, " {}", e); ret = Err(()); @@ -3868,6 +3910,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut counterparty_node_id = None; let mut confirmed_commitment_tx_counterparty_output = None; let mut spendable_txids_confirmed = Some(Vec::new()); + let mut counterparty_fulfilled_htlcs = Some(HashMap::new()); read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), (3, htlcs_resolved_on_chain, vec_type), @@ -3876,6 +3919,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (9, counterparty_node_id, option), (11, confirmed_commitment_tx_counterparty_output, option), (13, spendable_txids_confirmed, vec_type), + (15, counterparty_fulfilled_htlcs, option), }); Ok((best_block.block_hash(), ChannelMonitor::from_impl(ChannelMonitorImpl { @@ -3904,6 +3948,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP counterparty_claimable_outpoints, counterparty_commitment_txn_on_chain, counterparty_hash_commitment_number, + counterparty_fulfilled_htlcs: counterparty_fulfilled_htlcs.unwrap(), prev_holder_signed_commitment_tx, current_holder_commitment_tx, @@ -4077,7 +4122,6 @@ mod tests { let fee_estimator = TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let dummy_key = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let dummy_tx = Transaction { version: 0, lock_time: PackedLockTime::ZERO, input: Vec::new(), output: Vec::new() }; let mut preimages = Vec::new(); { @@ -4167,11 +4211,10 @@ mod tests { HolderCommitmentTransaction::dummy(), best_block, dummy_key); monitor.provide_latest_holder_commitment_tx(HolderCommitmentTransaction::dummy(), preimages_to_holder_htlcs!(preimages[0..10])).unwrap(); - let dummy_txid = dummy_tx.txid(); - monitor.provide_latest_counterparty_commitment_tx(dummy_txid, preimages_slice_to_htlc_outputs!(preimages[5..15]), 281474976710655, dummy_key, &logger); - monitor.provide_latest_counterparty_commitment_tx(dummy_txid, preimages_slice_to_htlc_outputs!(preimages[15..20]), 281474976710654, dummy_key, &logger); - monitor.provide_latest_counterparty_commitment_tx(dummy_txid, preimages_slice_to_htlc_outputs!(preimages[17..20]), 281474976710653, dummy_key, &logger); - monitor.provide_latest_counterparty_commitment_tx(dummy_txid, preimages_slice_to_htlc_outputs!(preimages[18..20]), 281474976710652, dummy_key, &logger); + monitor.provide_latest_counterparty_commitment_tx(Txid::from_inner(Sha256::hash(b"1").into_inner()), + preimages_slice_to_htlc_outputs!(preimages[5..15]), 281474976710655, dummy_key, &logger); + monitor.provide_latest_counterparty_commitment_tx(Txid::from_inner(Sha256::hash(b"2").into_inner()), + preimages_slice_to_htlc_outputs!(preimages[15..20]), 281474976710654, dummy_key, &logger); for &(ref preimage, ref hash) in preimages.iter() { let bounded_fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); monitor.provide_payment_preimage(hash, preimage, &broadcaster, &bounded_fee_estimator, &logger); @@ -4185,6 +4228,9 @@ mod tests { test_preimages_exist!(&preimages[0..10], monitor); test_preimages_exist!(&preimages[15..20], monitor); + monitor.provide_latest_counterparty_commitment_tx(Txid::from_inner(Sha256::hash(b"3").into_inner()), + preimages_slice_to_htlc_outputs!(preimages[17..20]), 281474976710653, dummy_key, &logger); + // Now provide a further secret, pruning preimages 15-17 secret[0..32].clone_from_slice(&hex::decode("c7518c8ae4660ed02894df8976fa1a3659c1a8b4b5bec0c4b872abeba4cb8964").unwrap()); monitor.provide_secret(281474976710654, secret.clone()).unwrap(); @@ -4192,6 +4238,9 @@ mod tests { test_preimages_exist!(&preimages[0..10], monitor); test_preimages_exist!(&preimages[17..20], monitor); + monitor.provide_latest_counterparty_commitment_tx(Txid::from_inner(Sha256::hash(b"4").into_inner()), + preimages_slice_to_htlc_outputs!(preimages[18..20]), 281474976710652, dummy_key, &logger); + // Now update holder commitment tx info, pruning only element 18 as we still care about the // previous commitment tx's preimages too monitor.provide_latest_holder_commitment_tx(HolderCommitmentTransaction::dummy(), preimages_to_holder_htlcs!(preimages[0..5])).unwrap(); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 2b9920aa2ba..842c9118086 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -27,7 +27,7 @@ use crate::ln::features::{ChannelTypeFeatures, InitFeatures}; use crate::ln::msgs; use crate::ln::msgs::{DecodeError, OptionalField, DataLossProtect}; use crate::ln::script::{self, ShutdownScript}; -use crate::ln::channelmanager::{self, CounterpartyForwardingInfo, PendingHTLCStatus, HTLCSource, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT}; +use crate::ln::channelmanager::{self, CounterpartyForwardingInfo, PendingHTLCStatus, HTLCSource, SentHTLCId, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT}; use crate::ln::chan_utils::{CounterpartyCommitmentSecrets, TxCreationKeys, HTLCOutputInCommitment, htlc_success_tx_weight, htlc_timeout_tx_weight, make_funding_redeemscript, ChannelPublicKeys, CommitmentTransaction, HolderCommitmentTransaction, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, MAX_HTLCS, get_commitment_transaction_number_obscure_factor, ClosingTransaction}; use crate::ln::chan_utils; use crate::ln::onion_utils::HTLCFailReason; @@ -192,6 +192,7 @@ enum OutboundHTLCState { #[derive(Clone)] enum OutboundHTLCOutcome { + /// LDK version 0.0.105+ will always fill in the preimage here. Success(Option), Failure(HTLCFailReason), } @@ -3159,15 +3160,6 @@ impl Channel { } } - self.latest_monitor_update_id += 1; - let mut monitor_update = ChannelMonitorUpdate { - update_id: self.latest_monitor_update_id, - updates: vec![ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { - commitment_tx: holder_commitment_tx, - htlc_outputs: htlcs_and_sigs - }] - }; - for htlc in self.pending_inbound_htlcs.iter_mut() { let new_forward = if let &InboundHTLCState::RemoteAnnounced(ref forward_info) = &htlc.state { Some(forward_info.clone()) @@ -3179,6 +3171,7 @@ impl Channel { need_commitment = true; } } + let mut claimed_htlcs = Vec::new(); for htlc in self.pending_outbound_htlcs.iter_mut() { if let &mut OutboundHTLCState::RemoteRemoved(ref mut outcome) = &mut htlc.state { log_trace!(logger, "Updating HTLC {} to AwaitingRemoteRevokeToRemove due to commitment_signed in channel {}.", @@ -3186,11 +3179,30 @@ impl Channel { // Grab the preimage, if it exists, instead of cloning let mut reason = OutboundHTLCOutcome::Success(None); mem::swap(outcome, &mut reason); + if let OutboundHTLCOutcome::Success(Some(preimage)) = reason { + // If a user (a) receives an HTLC claim using LDK 0.0.104 or before, then (b) + // upgrades to LDK 0.0.114 or later before the HTLC is fully resolved, we could + // have a `Success(None)` reason. In this case we could forget some HTLC + // claims, but such an upgrade is unlikely and including claimed HTLCs here + // fixes a bug which the user was exposed to on 0.0.104 when they started the + // claim anyway. + claimed_htlcs.push((SentHTLCId::from_source(&htlc.source), preimage)); + } htlc.state = OutboundHTLCState::AwaitingRemoteRevokeToRemove(reason); need_commitment = true; } } + self.latest_monitor_update_id += 1; + let mut monitor_update = ChannelMonitorUpdate { + update_id: self.latest_monitor_update_id, + updates: vec![ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { + commitment_tx: holder_commitment_tx, + htlc_outputs: htlcs_and_sigs, + claimed_htlcs, + }] + }; + self.cur_holder_commitment_transaction_number -= 1; // Note that if we need_commitment & !AwaitingRemoteRevoke we'll call // build_commitment_no_status_check() next which will reset this to RAAFirst. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 0757e117ce2..06d679ab85d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -234,6 +234,36 @@ impl Readable for InterceptId { Ok(InterceptId(buf)) } } + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +/// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`]. +pub(crate) enum SentHTLCId { + PreviousHopData { short_channel_id: u64, htlc_id: u64 }, + OutboundRoute { session_priv: SecretKey }, +} +impl SentHTLCId { + pub(crate) fn from_source(source: &HTLCSource) -> Self { + match source { + HTLCSource::PreviousHopData(hop_data) => Self::PreviousHopData { + short_channel_id: hop_data.short_channel_id, + htlc_id: hop_data.htlc_id, + }, + HTLCSource::OutboundRoute { session_priv, .. } => + Self::OutboundRoute { session_priv: *session_priv }, + } + } +} +impl_writeable_tlv_based_enum!(SentHTLCId, + (0, PreviousHopData) => { + (0, short_channel_id, required), + (2, htlc_id, required), + }, + (2, OutboundRoute) => { + (0, session_priv, required), + }; +); + + /// Tracks the inbound corresponding to an outbound HTLC #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash #[derive(Clone, PartialEq, Eq)] @@ -7445,6 +7475,10 @@ where probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes()); } + if !channel_closures.is_empty() { + pending_events_read.append(&mut channel_closures); + } + if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() { pending_outbound_payments = Some(pending_outbound_payments_compat); } else if pending_outbound_payments.is_none() { @@ -7453,7 +7487,13 @@ where outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs }); } pending_outbound_payments = Some(outbounds); - } else { + } + let pending_outbounds = OutboundPayments { + pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), + retry_lock: Mutex::new(()) + }; + + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state // (i.e. those for which we just force-closed above or we otherwise don't have a @@ -7464,16 +7504,17 @@ where // 0.0.102+ for (_, monitor) in args.channel_monitors.iter() { if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { - for (htlc_source, htlc) in monitor.get_pending_outbound_htlcs() { + for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, payment_secret, .. } = htlc_source { if path.is_empty() { log_error!(args.logger, "Got an empty path for a pending payment"); return Err(DecodeError::InvalidValue); } + let path_amt = path.last().unwrap().fee_msat; let mut session_priv_bytes = [0; 32]; session_priv_bytes[..].copy_from_slice(&session_priv[..]); - match pending_outbound_payments.as_mut().unwrap().entry(payment_id) { + match pending_outbounds.pending_outbound_payments.lock().unwrap().entry(payment_id) { hash_map::Entry::Occupied(mut entry) => { let newly_added = entry.get_mut().insert(session_priv_bytes, &path); log_info!(args.logger, "{} a pending payment path for {} msat for session priv {} on an existing pending payment with payment hash {}", @@ -7500,51 +7541,64 @@ where } } } - for (htlc_source, htlc) in monitor.get_all_current_outbound_htlcs() { - if let HTLCSource::PreviousHopData(prev_hop_data) = htlc_source { - let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| { - info.prev_funding_outpoint == prev_hop_data.outpoint && - info.prev_htlc_id == prev_hop_data.htlc_id - }; - // The ChannelMonitor is now responsible for this HTLC's - // failure/success and will let us know what its outcome is. If we - // still have an entry for this HTLC in `forward_htlcs` or - // `pending_intercepted_htlcs`, we were apparently not persisted after - // the monitor was when forwarding the payment. - forward_htlcs.retain(|_, forwards| { - forwards.retain(|forward| { - if let HTLCForwardInfo::AddHTLC(htlc_info) = forward { - if pending_forward_matches_htlc(&htlc_info) { - log_info!(args.logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}", - log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); - false + for (htlc_source, (htlc, preimage_opt)) in monitor.get_all_current_outbound_htlcs() { + match htlc_source { + HTLCSource::PreviousHopData(prev_hop_data) => { + let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| { + info.prev_funding_outpoint == prev_hop_data.outpoint && + info.prev_htlc_id == prev_hop_data.htlc_id + }; + // The ChannelMonitor is now responsible for this HTLC's + // failure/success and will let us know what its outcome is. If we + // still have an entry for this HTLC in `forward_htlcs` or + // `pending_intercepted_htlcs`, we were apparently not persisted after + // the monitor was when forwarding the payment. + forward_htlcs.retain(|_, forwards| { + forwards.retain(|forward| { + if let HTLCForwardInfo::AddHTLC(htlc_info) = forward { + if pending_forward_matches_htlc(&htlc_info) { + log_info!(args.logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}", + log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); + false + } else { true } } else { true } + }); + !forwards.is_empty() + }); + pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| { + if pending_forward_matches_htlc(&htlc_info) { + log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", + log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); + pending_events_read.retain(|event| { + if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event { + intercepted_id != ev_id + } else { true } + }); + false } else { true } }); - !forwards.is_empty() - }); - pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| { - if pending_forward_matches_htlc(&htlc_info) { - log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", - log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); - pending_events_read.retain(|event| { - if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event { - intercepted_id != ev_id - } else { true } - }); - false - } else { true } - }); + }, + HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } => { + if let Some(preimage) = preimage_opt { + let pending_events = Mutex::new(pending_events_read); + // Note that we set `from_onchain` to "false" here, + // deliberately keeping the pending payment around forever. + // Given it should only occur when we have a channel we're + // force-closing for being stale that's okay. + // The alternative would be to wipe the state when claiming, + // generating a `PaymentPathSuccessful` event but regenerating + // it and the `PaymentSent` on every restart until the + // `ChannelMonitor` is removed. + pending_outbounds.claim_htlc(payment_id, preimage, session_priv, path, false, &pending_events, &args.logger); + pending_events_read = pending_events.into_inner().unwrap(); + } + }, } } } } } - let pending_outbounds = OutboundPayments { - pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), - retry_lock: Mutex::new(()) - }; if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() { // If we have pending HTLCs to forward, assume we either dropped a // `PendingHTLCsForwardable` or the user received it but never processed it as they @@ -7602,10 +7656,6 @@ where let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&args.entropy_source.get_secure_random_bytes()); - if !channel_closures.is_empty() { - pending_events_read.append(&mut channel_closures); - } - let our_network_pubkey = match args.node_signer.get_node_id(Recipient::Node) { Ok(key) => key, Err(()) => return Err(DecodeError::InvalidValue) diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index c4cd0fc1b09..15361b98ad7 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -2748,3 +2748,83 @@ fn test_threaded_payment_retries() { } } } + +fn do_no_missing_sent_on_midpoint_reload(persist_manager_with_payment: bool) { + // Test that if we reload in the middle of an HTLC claim commitment signed dance we'll still + // receive the PaymentSent event even if the ChannelManager had no idea about the payment when + // it was last persisted. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let (persister_a, persister_b, persister_c); + let (chain_monitor_a, chain_monitor_b, chain_monitor_c); + let (nodes_0_deserialized, nodes_0_deserialized_b, nodes_0_deserialized_c); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + let mut nodes_0_serialized = Vec::new(); + if !persist_manager_with_payment { + nodes_0_serialized = nodes[0].node.encode(); + } + + let (our_payment_preimage, our_payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + + if persist_manager_with_payment { + nodes_0_serialized = nodes[0].node.encode(); + } + + nodes[1].node.claim_funds(our_payment_preimage); + check_added_monitors!(nodes[1], 1); + expect_payment_claimed!(nodes[1], our_payment_hash, 1_000_000); + + let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); + nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &updates.commitment_signed); + check_added_monitors!(nodes[0], 1); + + // The ChannelMonitor should always be the latest version, as we're required to persist it + // during the commitment signed handling. + let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode(); + reload_node!(nodes[0], test_default_channel_config(), &nodes_0_serialized, &[&chan_0_monitor_serialized], persister_a, chain_monitor_a, nodes_0_deserialized); + + let events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 2); + if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[0] {} else { panic!(); } + if let Event::PaymentSent { payment_preimage, .. } = events[1] { assert_eq!(payment_preimage, our_payment_preimage); } else { panic!(); } + // Note that we don't get a PaymentPathSuccessful here as we leave the HTLC pending to avoid + // the double-claim that would otherwise appear at the end of this test. + let as_broadcasted_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(as_broadcasted_txn.len(), 1); + + // Ensure that, even after some time, if we restart we still include *something* in the current + // `ChannelManager` which prevents a `PaymentFailed` when we restart even if pending resolved + // payments have since been timed out thanks to `IDEMPOTENCY_TIMEOUT_TICKS`. + // A naive implementation of the fix here would wipe the pending payments set, causing a + // failure event when we restart. + for _ in 0..(IDEMPOTENCY_TIMEOUT_TICKS * 2) { nodes[0].node.timer_tick_occurred(); } + + let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode(); + reload_node!(nodes[0], test_default_channel_config(), &nodes[0].node.encode(), &[&chan_0_monitor_serialized], persister_b, chain_monitor_b, nodes_0_deserialized_b); + let events = nodes[0].node.get_and_clear_pending_events(); + assert!(events.is_empty()); + + // Ensure that we don't generate any further events even after the channel-closing commitment + // transaction is confirmed on-chain. + confirm_transaction(&nodes[0], &as_broadcasted_txn[0]); + for _ in 0..(IDEMPOTENCY_TIMEOUT_TICKS * 2) { nodes[0].node.timer_tick_occurred(); } + + let events = nodes[0].node.get_and_clear_pending_events(); + assert!(events.is_empty()); + + let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode(); + reload_node!(nodes[0], test_default_channel_config(), &nodes[0].node.encode(), &[&chan_0_monitor_serialized], persister_c, chain_monitor_c, nodes_0_deserialized_c); + let events = nodes[0].node.get_and_clear_pending_events(); + assert!(events.is_empty()); +} + +#[test] +fn no_missing_sent_on_midpoint_reload() { + do_no_missing_sent_on_midpoint_reload(false); + do_no_missing_sent_on_midpoint_reload(true); +} diff --git a/lightning/src/sync/debug_sync.rs b/lightning/src/sync/debug_sync.rs index 5b6acbcadd5..dd106a9e897 100644 --- a/lightning/src/sync/debug_sync.rs +++ b/lightning/src/sync/debug_sync.rs @@ -201,6 +201,11 @@ pub struct Mutex { inner: StdMutex, deps: Arc, } +impl Mutex { + pub(crate) fn into_inner(self) -> LockResult { + self.inner.into_inner().map_err(|_| ()) + } +} #[must_use = "if unused the Mutex will immediately unlock"] pub struct MutexGuard<'a, T: Sized + 'a> { diff --git a/lightning/src/sync/nostd_sync.rs b/lightning/src/sync/nostd_sync.rs index 858f60db5b5..17307997d81 100644 --- a/lightning/src/sync/nostd_sync.rs +++ b/lightning/src/sync/nostd_sync.rs @@ -60,6 +60,10 @@ impl Mutex { pub fn try_lock<'a>(&'a self) -> LockResult> { Ok(MutexGuard { lock: self.inner.borrow_mut() }) } + + pub fn into_inner(self) -> LockResult { + Ok(self.inner.into_inner()) + } } impl<'a, T: 'a> LockTestExt<'a> for Mutex {