From 82b532c54db938c64f9bc9fb96c8f572337052be Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 11 Oct 2023 01:39:26 +0000 Subject: [PATCH 1/4] Log when we prepare to block a channel's next `revoke_and_ack` This may help in debugging blocking actions in the future. --- lightning/src/ln/channelmanager.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 1e9a3ed011d..69093d7f6a3 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -6387,6 +6387,9 @@ where if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry); if let HTLCSource::PreviousHopData(prev_hop) = &res.0 { + log_trace!(self.logger, + "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor", + msg.channel_id); peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id) .or_insert_with(Vec::new) .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop)); @@ -9972,6 +9975,9 @@ where Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), .. } = action { if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) { + log_trace!(args.logger, + "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor", + blocked_channel_outpoint.to_channel_id()); blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates .entry(blocked_channel_outpoint.to_channel_id()) .or_insert_with(Vec::new).push(blocking_action.clone()); From 6d85be27d4f499d18f3626d457f91c9e0ca5ff16 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 12 Oct 2023 22:26:07 +0000 Subject: [PATCH 2/4] Indicate to `claim_funds_internal` that we're replaying on startup While we'd previously avoided this, this is sadly now required in the next commit. --- lightning/src/ln/channelmanager.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 69093d7f6a3..9340dbbee03 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -5492,7 +5492,7 @@ where } fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, - forwarded_htlc_value_msat: Option, from_onchain: bool, + forwarded_htlc_value_msat: Option, from_onchain: bool, startup_replay: bool, next_channel_counterparty_node_id: Option, next_channel_outpoint: OutPoint ) { match source { @@ -6410,7 +6410,7 @@ where hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; - self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo); + self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, false, Some(*counterparty_node_id), funding_txo); Ok(()) } @@ -6863,7 +6863,7 @@ where MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage); - self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint); + self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, false, counterparty_node_id, funding_outpoint); } else { log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash); let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() }; @@ -10059,7 +10059,7 @@ where // don't remember in the `ChannelMonitor` where we got a preimage from, but if the // channel is closed we just assume that it probably came from an on-chain claim. channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), - downstream_closed, downstream_node_id, downstream_funding); + downstream_closed, true, downstream_node_id, downstream_funding); } //TODO: Broadcast channel update for closed channels, but only after we've made a From 9ea90a4a445bde06debcfa291141514055e08c0e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 11 Oct 2023 13:56:00 +0000 Subject: [PATCH 3/4] Add an immediately-freeing `MonitorUpdateCompletionAction`. When `MonitorUpdateCompletionAction`s were added, we didn't consider the case of a duplicate claim during normal HTLC processing (as the handling only had an `if let` rather than a `match`, which made the branch easy to miss). This can lead to a channel freezing indefinitely if an HTLC is claimed (without a `commitment_signed`), the peer disconnects, and then the HTLC is claimed again, leading to a never-completing `MonitorUpdateCompletionAction`. The fix is simple - if we get back an `UpdateFulfillCommitFetch::DuplicateClaim` when claiming from the inbound edge, immediately unlock the outbound edge channel with a new `MonitorUpdateCompletionAction::FreeOtherChannelImmediately`. Here we add the new variant, which we start generating in the next commit. --- lightning/src/ln/channelmanager.rs | 36 ++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9340dbbee03..aa5f27606f0 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -615,10 +615,34 @@ pub(crate) enum MonitorUpdateCompletionAction { event: events::Event, downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>, }, + /// Indicates we should immediately resume the operation of another channel, unless there is + /// some other reason why the channel is blocked. In practice this simply means immediately + /// removing the [`RAAMonitorUpdateBlockingAction`] provided from the blocking set. + /// + /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge + /// from completing a monitor update which removes the payment preimage until the inbound edge + /// completes a monitor update containing the payment preimage. However, we use this variant + /// instead of [`Self::EmitEventAndFreeOtherChannel`] when we discover that the claim was in + /// fact duplicative and we simply want to resume the outbound edge channel immediately. + /// + /// This variant should thus never be written to disk, as it is processed inline rather than + /// stored for later processing. + FreeOtherChannelImmediately { + downstream_counterparty_node_id: PublicKey, + downstream_funding_outpoint: OutPoint, + blocking_action: RAAMonitorUpdateBlockingAction, + }, } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, PaymentClaimed) => { (0, payment_hash, required) }, + // Note that FreeOtherChannelImmediately should never be written - we were supposed to free + // *immediately*. However, for simplicity we implement read/write here. + (1, FreeOtherChannelImmediately) => { + (0, downstream_counterparty_node_id, required), + (2, downstream_funding_outpoint, required), + (4, blocking_action, required), + }, (2, EmitEventAndFreeOtherChannel) => { (0, event, upgradable_required), // LDK prior to 0.0.116 did not have this field as the monitor update application order was @@ -5585,6 +5609,15 @@ where self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker)); } }, + MonitorUpdateCompletionAction::FreeOtherChannelImmediately { + downstream_counterparty_node_id, downstream_funding_outpoint, blocking_action, + } => { + self.handle_monitor_update_release( + downstream_counterparty_node_id, + downstream_funding_outpoint, + Some(blocking_action), + ); + }, } } } @@ -9989,6 +10022,9 @@ where // anymore. } } + if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { .. } = action { + debug_assert!(false, "Non-event-generating channel freeing should not appear in our queue"); + } } } peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions; From f47270e760c2f6371d2601ecc4bcf9d24c790926 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 11 Oct 2023 14:01:28 +0000 Subject: [PATCH 4/4] Immediately unblock channels on duplicate claims When `MonitorUpdateCompletionAction`s were added, we didn't consider the case of a duplicate claim during normal HTLC processing (as the handling only had an `if let` rather than a `match`, which made the branch easy to miss). This can lead to a channel freezing indefinitely if an HTLC is claimed (without a `commitment_signed`), the peer disconnects, and then the HTLC is claimed again, leading to a never-completing `MonitorUpdateCompletionAction`. The fix is simple - if we get back an `UpdateFulfillCommitFetch::DuplicateClaim` when claiming from the inbound edge, immediately unlock the outbound edge channel with a new `MonitorUpdateCompletionAction::FreeOtherChannelImmediately`. Here we implement this fix by actually generating the new variant when a claim is duplicative. --- lightning/src/ln/chanmon_update_fail_tests.rs | 69 ++++++ lightning/src/ln/channelmanager.rs | 197 ++++++++++++++---- 2 files changed, 227 insertions(+), 39 deletions(-) diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index dc9bbd3e404..6fb21b3a15b 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -3431,3 +3431,72 @@ fn test_reload_mon_update_completion_actions() { do_test_reload_mon_update_completion_actions(true); do_test_reload_mon_update_completion_actions(false); } + +fn do_test_glacial_peer_cant_hang(hold_chan_a: bool) { + // Test that if a peer manages to send an `update_fulfill_htlc` message without a + // `commitment_signed`, disconnects, then replays the `update_fulfill_htlc` message it doesn't + // result in a channel hang. This was previously broken as the `DuplicateClaim` case wasn't + // handled when claiming an HTLC and handling wasn't added when completion actions were added + // (which must always complete at some point). + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + create_announced_chan_between_nodes(&nodes, 0, 1); + create_announced_chan_between_nodes(&nodes, 1, 2); + + // Route a payment from A, through B, to C, then claim it on C. Replay the + // `update_fulfill_htlc` twice on B to check that B doesn't hang. + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000); + + nodes[2].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[2], 1); + expect_payment_claimed!(nodes[2], payment_hash, 1_000_000); + + let cs_updates = get_htlc_update_msgs(&nodes[2], &nodes[1].node.get_our_node_id()); + if hold_chan_a { + // The first update will be on the A <-> B channel, which we allow to complete. + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + } + nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]); + check_added_monitors(&nodes[1], 1); + + if !hold_chan_a { + let bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.update_fulfill_htlcs[0]); + commitment_signed_dance!(nodes[0], nodes[1], bs_updates.commitment_signed, false); + expect_payment_sent!(&nodes[0], payment_preimage); + } + + nodes[1].node.peer_disconnected(&nodes[2].node.get_our_node_id()); + nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id()); + + let mut reconnect = ReconnectArgs::new(&nodes[1], &nodes[2]); + reconnect.pending_htlc_claims = (1, 0); + reconnect_nodes(reconnect); + + if !hold_chan_a { + expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false); + send_payment(&nodes[0], &[&nodes[1], &nodes[2]], 100_000); + } else { + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + let (route, payment_hash_2, _, payment_secret_2) = get_route_and_payment_hash!(&nodes[1], nodes[2], 1_000_000); + + nodes[1].node.send_payment_with_route(&route, payment_hash_2, + RecipientOnionFields::secret_only(payment_secret_2), PaymentId(payment_hash_2.0)).unwrap(); + check_added_monitors(&nodes[1], 0); + + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + } +} + +#[test] +fn test_glacial_peer_cant_hang() { + do_test_glacial_peer_cant_hang(false); + do_test_glacial_peer_cant_hang(true); +} diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index aa5f27606f0..1a4bdfbf6ee 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -563,6 +563,7 @@ struct ClaimablePayments { /// usually because we're running pre-full-init. They are handled immediately once we detect we are /// running normally, and specifically must be processed before any other non-background /// [`ChannelMonitorUpdate`]s are applied. +#[derive(Debug)] enum BackgroundEvent { /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel. /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the @@ -5381,8 +5382,11 @@ where for htlc in sources.drain(..) { if let Err((pk, err)) = self.claim_funds_from_hop( htlc.prev_hop, payment_preimage, - |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })) - { + |_, definitely_duplicate| { + debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment"); + Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }) + } + ) { if let msgs::ErrorAction::IgnoreError = err.err.action { // We got a temporary failure updating monitor, but will claim the // HTLC when the monitor updating is restored (or on chain). @@ -5410,7 +5414,7 @@ where } } - fn claim_funds_from_hop) -> Option>(&self, + fn claim_funds_from_hop, bool) -> Option>(&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc) -> Result<(), (PublicKey, MsgHandleErrInternal)> { //TODO: Delay the claimed_funds relaying just like we do outbound relay! @@ -5420,6 +5424,11 @@ where // `BackgroundEvent`s. let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire); + // As we may call handle_monitor_update_completion_actions in rather rare cases, check that + // the required mutexes are not held before we start. + debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread); + debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread); + { let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.outpoint.to_channel_id(); @@ -5441,25 +5450,70 @@ where let counterparty_node_id = chan.context.get_counterparty_node_id(); let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger); - if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res { - if let Some(action) = completion_action(Some(htlc_value_msat)) { - log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}", - chan_id, action); - peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); + match fulfill_res { + UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => { + if let Some(action) = completion_action(Some(htlc_value_msat), false) { + log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}", + chan_id, action); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); + } + if !during_init { + handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, + peer_state, per_peer_state, chan); + } else { + // If we're running during init we cannot update a monitor directly - + // they probably haven't actually been loaded yet. Instead, push the + // monitor update as a background event. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo: prev_hop.outpoint, + update: monitor_update.clone(), + }); + } } - if !during_init { - handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, - peer_state, per_peer_state, chan); - } else { - // If we're running during init we cannot update a monitor directly - - // they probably haven't actually been loaded yet. Instead, push the - // monitor update as a background event. - self.pending_background_events.lock().unwrap().push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo: prev_hop.outpoint, - update: monitor_update.clone(), - }); + UpdateFulfillCommitFetch::DuplicateClaim {} => { + let action = if let Some(action) = completion_action(None, true) { + action + } else { + return Ok(()); + }; + mem::drop(peer_state_lock); + + log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}", + chan_id, action); + let (node_id, funding_outpoint, blocker) = + if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { + downstream_counterparty_node_id: node_id, + downstream_funding_outpoint: funding_outpoint, + blocking_action: blocker, + } = action { + (node_id, funding_outpoint, blocker) + } else { + debug_assert!(false, + "Duplicate claims should always free another channel immediately"); + return Ok(()); + }; + if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { + let mut peer_state = peer_state_mtx.lock().unwrap(); + if let Some(blockers) = peer_state + .actions_blocking_raa_monitor_updates + .get_mut(&funding_outpoint.to_channel_id()) + { + let mut found_blocker = false; + blockers.retain(|iter| { + // Note that we could actually be blocked, in + // which case we need to only remove the one + // blocker which was added duplicatively. + let first_blocker = !found_blocker; + if *iter == blocker { found_blocker = true; } + *iter != blocker || !first_blocker + }); + debug_assert!(found_blocker); + } + } else { + debug_assert!(false); + } } } } @@ -5507,7 +5561,7 @@ where // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are // generally always allowed to be duplicative (and it's specifically noted in // `PaymentForwarded`). - self.handle_monitor_update_completion_actions(completion_action(None)); + self.handle_monitor_update_completion_actions(completion_action(None, false)); Ok(()) } @@ -5537,13 +5591,84 @@ where HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); + #[cfg(debug_assertions)] + let claiming_chan_funding_outpoint = hop_data.outpoint; let res = self.claim_funds_from_hop(hop_data, payment_preimage, - |htlc_claim_value_msat| { - if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { - let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat { - Some(claimed_htlc_value - forwarded_htlc_value) - } else { None }; + |htlc_claim_value_msat, definitely_duplicate| { + let chan_to_release = + if let Some(node_id) = next_channel_counterparty_node_id { + Some((node_id, next_channel_outpoint, completed_blocker)) + } else { + // We can only get `None` here if we are processing a + // `ChannelMonitor`-originated event, in which case we + // don't care about ensuring we wake the downstream + // channel's monitor updating - the channel is already + // closed. + None + }; + if definitely_duplicate && startup_replay { + // On startup we may get redundant claims which are related to + // monitor updates still in flight. In that case, we shouldn't + // immediately free, but instead let that monitor update complete + // in the background. + #[cfg(debug_assertions)] { + let background_events = self.pending_background_events.lock().unwrap(); + // There should be a `BackgroundEvent` pending... + assert!(background_events.iter().any(|ev| { + match ev { + // to apply a monitor update that blocked the claiming channel, + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + funding_txo, update, .. + } => { + if *funding_txo == claiming_chan_funding_outpoint { + assert!(update.updates.iter().any(|upd| + if let ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: update_preimage + } = upd { + payment_preimage == *update_preimage + } else { false } + ), "{:?}", update); + true + } else { false } + }, + // or the channel we'd unblock is already closed, + BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup( + (funding_txo, monitor_update) + ) => { + if *funding_txo == next_channel_outpoint { + assert_eq!(monitor_update.updates.len(), 1); + assert!(matches!( + monitor_update.updates[0], + ChannelMonitorUpdateStep::ChannelForceClosed { .. } + )); + true + } else { false } + }, + // or the monitor update has completed and will unblock + // immediately once we get going. + BackgroundEvent::MonitorUpdatesComplete { + channel_id, .. + } => + *channel_id == claiming_chan_funding_outpoint.to_channel_id(), + } + }), "{:?}", *background_events); + } + None + } else if definitely_duplicate { + if let Some(other_chan) = chan_to_release { + Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately { + downstream_counterparty_node_id: other_chan.0, + downstream_funding_outpoint: other_chan.1, + blocking_action: other_chan.2, + }) + } else { None } + } else { + let fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { + if let Some(claimed_htlc_value) = htlc_claim_value_msat { + Some(claimed_htlc_value - forwarded_htlc_value) + } else { None } + } else { None }; Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { event: events::Event::PaymentForwarded { fee_earned_msat, @@ -5552,19 +5677,9 @@ where next_channel_id: Some(next_channel_outpoint.to_channel_id()), outbound_amount_forwarded_msat: forwarded_htlc_value_msat, }, - downstream_counterparty_and_funding_outpoint: - if let Some(node_id) = next_channel_counterparty_node_id { - Some((node_id, next_channel_outpoint, completed_blocker)) - } else { - // We can only get `None` here if we are processing a - // `ChannelMonitor`-originated event, in which case we - // don't care about ensuring we wake the downstream - // channel's monitor updating - the channel is already - // closed. - None - }, + downstream_counterparty_and_funding_outpoint: chan_to_release, }) - } else { None } + } }); if let Err((pk, err)) = res { let result: Result<(), _> = Err(err); @@ -5580,6 +5695,10 @@ where } fn handle_monitor_update_completion_actions>(&self, actions: I) { + debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread); + debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread); + debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); + for action in actions.into_iter() { match action { MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {