Skip to content

Move claimable_htlcs to separate lock #1772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 85 additions & 80 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,6 @@ pub(super) enum RAACommitmentOrder {
// Note this is only exposed in cfg(test):
pub(super) struct ChannelHolder<Signer: Sign> {
pub(super) by_id: HashMap<[u8; 32], Channel<Signer>>,
/// Map from payment hash to the payment data and any HTLCs which are to us and can be
/// failed/claimed by the user.
///
/// Note that while this is held in the same mutex as the channels themselves, no consistency
/// guarantees are made about the channels given here actually existing anymore by the time you
/// go to read them!
claimable_htlcs: HashMap<PaymentHash, (events::PaymentPurpose, Vec<ClaimableHTLC>)>,
/// Messages to send to peers - pushed to in the same lock that they are generated in (except
/// for broadcast messages, where ordering isn't as strict).
pub(super) pending_msg_events: Vec<MessageSendEvent>,
Expand Down Expand Up @@ -670,19 +663,21 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage
// |
// |__`forward_htlcs`
// |
// |__`channel_state`
// |__`pending_inbound_payments`
Copy link
Contributor Author

@ViktorTigerstrom ViktorTigerstrom Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general question. Would we actually in the long run like pending_inbound_payments and pending_outbound_payments to be peer specific (ie. be stored in the PeerState)? I haven't really researched all the implications that would have yet. From my first hunch it'd be pretty beneficial for parallelization (especially pending_outbound_payments), but will be hard in terms of backwards compatibly. But if we would like it, maybe my last commit isn't such a great idea in the long run, but might be ok for now.

Edit: Oh actually as they'd be part of the PeerState mutex lock and not individual locks any longer, the lock order change by the last commit should be ok :)

// | |
// | |__`id_to_peer`
// | |__`claimable_htlcs`
// | |
// | |__`short_to_chan_info`
// | |
// | |__`per_peer_state`
// | |
// | |__`outbound_scid_aliases`
// | |__`pending_outbound_payments`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true - claim_funds_internal takes a pending_outbound_payments lock while holding the channel_state mutex. Maybe just drop the mutex-order-swap commit for now and we can do it separately later?

Copy link
Contributor Author

@ViktorTigerstrom ViktorTigerstrom Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm the channel_state lock is dropped (mem::drop) before pending_outbound_payments is locked in claim_funds_internal, no?

I can of course instead drop the mutex-order-swap commit for now if we'd prefer though :). That will require that we clone the PaymentPurpose for the claimable_htlcs in the ChannelMananger::write function as per the #1772 (comment) discussion, or that we do the weird scoping I did with 266175d. Are we ok with any of those alternatives?

// | |
// | |__`pending_inbound_payments`
// | |__`channel_state`
// | |
// | |__`id_to_peer`
// | |
// | |__`pending_outbound_payments`
// | |__`short_to_chan_info`
// | |
// | |__`per_peer_state`
// | |
// | |__`outbound_scid_aliases`
// | |
// | |__`best_block`
// | |
Expand Down Expand Up @@ -753,6 +748,15 @@ pub struct ChannelManager<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
#[cfg(not(test))]
forward_htlcs: Mutex<HashMap<u64, Vec<HTLCForwardInfo>>>,

/// Map from payment hash to the payment data and any HTLCs which are to us and can be
/// failed/claimed by the user.
///
/// Note that, no consistency guarantees are made about the channels given here actually
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/,//

/// existing anymore by the time you go to read them!
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could precise what is meant by "channels given here actually existing anymore by the time" It could mean a) The channel is closed on-chain or funding has been aborted or b) we don't have any reference in our local map (i.e ChannelHolder:by_id).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't those two equivalent? I'm not sure what would be more specific.

///
/// See `ChannelManager` struct-level documentation for lock order requirements.
claimable_htlcs: Mutex<HashMap<PaymentHash, (events::PaymentPurpose, Vec<ClaimableHTLC>)>>,

/// The set of outbound SCID aliases across all our channels, including unconfirmed channels
/// and some closed channels which reached a usable state prior to being closed. This is used
/// only to avoid duplicates, and is not persisted explicitly to disk, but rebuilt from the
Expand Down Expand Up @@ -1657,13 +1661,13 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F

channel_state: Mutex::new(ChannelHolder{
by_id: HashMap::new(),
claimable_htlcs: HashMap::new(),
pending_msg_events: Vec::new(),
}),
outbound_scid_aliases: Mutex::new(HashSet::new()),
pending_inbound_payments: Mutex::new(HashMap::new()),
pending_outbound_payments: Mutex::new(HashMap::new()),
forward_htlcs: Mutex::new(HashMap::new()),
claimable_htlcs: Mutex::new(HashMap::new()),
id_to_peer: Mutex::new(HashMap::new()),
short_to_chan_info: FairRwLock::new(HashMap::new()),

Expand Down Expand Up @@ -1902,14 +1906,16 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
if *counterparty_node_id != chan_entry.get().get_counterparty_node_id(){
return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() });
}
let per_peer_state = self.per_peer_state.read().unwrap();
let (shutdown_msg, monitor_update, htlcs) = match per_peer_state.get(&counterparty_node_id) {
Some(peer_state) => {
let peer_state = peer_state.lock().unwrap();
let their_features = &peer_state.latest_features;
chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)?
},
None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }),
let (shutdown_msg, monitor_update, htlcs) = {
let per_peer_state = self.per_peer_state.read().unwrap();
match per_peer_state.get(&counterparty_node_id) {
Some(peer_state) => {
let peer_state = peer_state.lock().unwrap();
let their_features = &peer_state.latest_features;
chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)?
},
None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }),
}
};
failed_htlcs = htlcs;

Expand Down Expand Up @@ -3142,8 +3148,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());

for (short_chan_id, mut pending_forwards) in forward_htlcs {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
if short_chan_id != 0 {
macro_rules! forwarding_channel_not_found {
() => {
Expand Down Expand Up @@ -3242,6 +3246,8 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
continue;
}
};
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;
match channel_state.by_id.entry(forward_chan_id) {
hash_map::Entry::Vacant(_) => {
forwarding_channel_not_found!();
Expand Down Expand Up @@ -3434,7 +3440,8 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
payment_secret: $payment_data.payment_secret,
}
};
let (_, htlcs) = channel_state.claimable_htlcs.entry(payment_hash)
let mut claimable_htlcs = self.claimable_htlcs.lock().unwrap();
let (_, htlcs) = claimable_htlcs.entry(payment_hash)
.or_insert_with(|| (purpose(), Vec::new()));
if htlcs.len() == 1 {
if let OnionPayload::Spontaneous(_) = htlcs[0].onion_payload {
Expand Down Expand Up @@ -3502,7 +3509,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
check_total_value!(payment_data, payment_preimage);
},
OnionPayload::Spontaneous(preimage) => {
match channel_state.claimable_htlcs.entry(payment_hash) {
match self.claimable_htlcs.lock().unwrap().entry(payment_hash) {
hash_map::Entry::Vacant(e) => {
let purpose = events::PaymentPurpose::SpontaneousPayment(preimage);
e.insert((purpose.clone(), vec![claimable_htlc]));
Expand Down Expand Up @@ -3791,29 +3798,29 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F

true
});
}

channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
if htlcs.is_empty() {
// This should be unreachable
debug_assert!(false);
self.claimable_htlcs.lock().unwrap().retain(|payment_hash, (_, htlcs)| {
if htlcs.is_empty() {
// This should be unreachable
debug_assert!(false);
return false;
}
if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload {
// Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat).
// In this case we're not going to handle any timeouts of the parts here.
if htlcs[0].total_msat == htlcs.iter().fold(0, |total, htlc| total + htlc.value) {
return true;
} else if htlcs.into_iter().any(|htlc| {
htlc.timer_ticks += 1;
return htlc.timer_ticks >= MPP_TIMEOUT_TICKS
}) {
timed_out_mpp_htlcs.extend(htlcs.into_iter().map(|htlc| (htlc.prev_hop.clone(), payment_hash.clone())));
return false;
}
if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload {
// Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat).
// In this case we're not going to handle any timeouts of the parts here.
if htlcs[0].total_msat == htlcs.iter().fold(0, |total, htlc| total + htlc.value) {
return true;
} else if htlcs.into_iter().any(|htlc| {
htlc.timer_ticks += 1;
return htlc.timer_ticks >= MPP_TIMEOUT_TICKS
}) {
timed_out_mpp_htlcs.extend(htlcs.into_iter().map(|htlc| (htlc.prev_hop.clone(), payment_hash.clone())));
return false;
}
}
true
});
}
}
true
});

for htlc_source in timed_out_mpp_htlcs.drain(..) {
let receiver = HTLCDestination::FailedPayment { payment_hash: htlc_source.1 };
Expand Down Expand Up @@ -3846,10 +3853,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);

let removed_source = {
let mut channel_state = self.channel_state.lock().unwrap();
channel_state.claimable_htlcs.remove(payment_hash)
};
let removed_source = self.claimable_htlcs.lock().unwrap().remove(payment_hash);
if let Some((_, mut sources)) = removed_source {
for htlc in sources.drain(..) {
let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
Expand Down Expand Up @@ -4151,7 +4155,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F

let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);

let removed_source = self.channel_state.lock().unwrap().claimable_htlcs.remove(&payment_hash);
let removed_source = self.claimable_htlcs.lock().unwrap().remove(&payment_hash);
if let Some((payment_purpose, mut sources)) = removed_source {
assert!(!sources.is_empty());

Expand Down Expand Up @@ -6019,28 +6023,28 @@ where
}
true
});
}

if let Some(height) = height_opt {
channel_state.claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
htlcs.retain(|htlc| {
// If height is approaching the number of blocks we think it takes us to get
// our commitment transaction confirmed before the HTLC expires, plus the
// number of blocks we generally consider it to take to do a commitment update,
// just give up on it and fail the HTLC.
if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER {
let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height));

timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason {
failure_code: 0x4000 | 15,
data: htlc_msat_height_data
}, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() }));
false
} else { true }
});
!htlcs.is_empty() // Only retain this entry if htlcs has at least one entry.
if let Some(height) = height_opt {
self.claimable_htlcs.lock().unwrap().retain(|payment_hash, (_, htlcs)| {
htlcs.retain(|htlc| {
// If height is approaching the number of blocks we think it takes us to get
// our commitment transaction confirmed before the HTLC expires, plus the
// number of blocks we generally consider it to take to do a commitment update,
// just give up on it and fail the HTLC.
if height >= htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER {
let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(height));

timed_out_htlcs.push((HTLCSource::PreviousHopData(htlc.prev_hop.clone()), payment_hash.clone(), HTLCFailReason::Reason {
failure_code: 0x4000 | 15,
data: htlc_msat_height_data
}, HTLCDestination::FailedPayment { payment_hash: payment_hash.clone() }));
false
} else { true }
});
}
!htlcs.is_empty() // Only retain this entry if htlcs has at least one entry.
});
}

self.handle_init_event_channel_failures(failed_channels);
Expand Down Expand Up @@ -6775,10 +6779,13 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelMana
}
}

let channel_state = self.channel_state.lock().unwrap();
let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
let claimable_htlcs = self.claimable_htlcs.lock().unwrap();
let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();

let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new();
(channel_state.claimable_htlcs.len() as u64).write(writer)?;
for (payment_hash, (purpose, previous_hops)) in channel_state.claimable_htlcs.iter() {
(claimable_htlcs.len() as u64).write(writer)?;
for (payment_hash, (purpose, previous_hops)) in claimable_htlcs.iter() {
payment_hash.write(writer)?;
(previous_hops.len() as u64).write(writer)?;
for htlc in previous_hops.iter() {
Expand All @@ -6795,8 +6802,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelMana
peer_state.latest_features.write(writer)?;
}

let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
let events = self.pending_events.lock().unwrap();
(events.len() as u64).write(writer)?;
for event in events.iter() {
Expand Down Expand Up @@ -7389,14 +7394,14 @@ impl<'a, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>

channel_state: Mutex::new(ChannelHolder {
by_id,
claimable_htlcs,
pending_msg_events: Vec::new(),
}),
inbound_payment_key: expanded_inbound_key,
pending_inbound_payments: Mutex::new(pending_inbound_payments),
pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),

forward_htlcs: Mutex::new(forward_htlcs),
claimable_htlcs: Mutex::new(claimable_htlcs),
outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
id_to_peer: Mutex::new(id_to_peer),
short_to_chan_info: FairRwLock::new(short_to_chan_info),
Expand Down