Skip to content

Commit c187564

Browse files
committed
Always process claim ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline for `claim_funds_from_hop`, ie the `update_fulfill_htlc`-generation pipeline.
1 parent 538dc49 commit c187564

File tree

4 files changed

+89
-85
lines changed

4 files changed

+89
-85
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ mod tests {
796796
use crate::ln::functional_test_utils::*;
797797
use crate::ln::msgs::ChannelMessageHandler;
798798
use crate::util::errors::APIError;
799-
use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
799+
use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider};
800800

801801
#[test]
802802
fn test_async_ooo_offchain_updates() {
@@ -818,10 +818,8 @@ mod tests {
818818

819819
nodes[1].node.claim_funds(payment_preimage_1);
820820
check_added_monitors!(nodes[1], 1);
821-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
822821
nodes[1].node.claim_funds(payment_preimage_2);
823822
check_added_monitors!(nodes[1], 1);
824-
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
825823

826824
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
827825

@@ -851,8 +849,24 @@ mod tests {
851849
.find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
852850
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
853851
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
852+
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
854853
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
855854

855+
let claim_events = nodes[1].node.get_and_clear_pending_events();
856+
assert_eq!(claim_events.len(), 2);
857+
match claim_events[0] {
858+
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
859+
assert_eq!(payment_hash_1, *payment_hash);
860+
},
861+
_ => panic!("Unexpected event"),
862+
}
863+
match claim_events[1] {
864+
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
865+
assert_eq!(payment_hash_2, *payment_hash);
866+
},
867+
_ => panic!("Unexpected event"),
868+
}
869+
856870
// Now manually walk the commitment signed dance - because we claimed two payments
857871
// back-to-back it doesn't fit into the neat walk commitment_signed_dance does.
858872

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1590,7 +1590,6 @@ fn test_monitor_update_fail_claim() {
15901590

15911591
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
15921592
nodes[1].node.claim_funds(payment_preimage_1);
1593-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
15941593
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
15951594
check_added_monitors!(nodes[1], 1);
15961595

@@ -1616,6 +1615,7 @@ fn test_monitor_update_fail_claim() {
16161615
let events = nodes[1].node.get_and_clear_pending_msg_events();
16171616
assert_eq!(events.len(), 0);
16181617
commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
1618+
expect_pending_htlcs_forwardable_ignore!(nodes[1]);
16191619

16201620
let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]);
16211621
nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap();
@@ -1633,6 +1633,7 @@ fn test_monitor_update_fail_claim() {
16331633
let channel_id = chan_1.2;
16341634
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
16351635
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
1636+
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
16361637
check_added_monitors!(nodes[1], 0);
16371638

16381639
let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1641,7 +1642,7 @@ fn test_monitor_update_fail_claim() {
16411642
expect_payment_sent!(nodes[0], payment_preimage_1);
16421643

16431644
// Get the payment forwards, note that they were batched into one commitment update.
1644-
expect_pending_htlcs_forwardable!(nodes[1]);
1645+
nodes[1].node.process_pending_htlc_forwards();
16451646
check_added_monitors!(nodes[1], 1);
16461647
let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
16471648
nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]);
@@ -1784,14 +1785,14 @@ fn monitor_update_claim_fail_no_response() {
17841785

17851786
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
17861787
nodes[1].node.claim_funds(payment_preimage_1);
1787-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
17881788
check_added_monitors!(nodes[1], 1);
17891789

17901790
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
17911791

17921792
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
17931793
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
17941794
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
1795+
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
17951796
check_added_monitors!(nodes[1], 0);
17961797
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
17971798

@@ -2270,7 +2271,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
22702271
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
22712272
nodes[0].node.claim_funds(payment_preimage_0);
22722273
check_added_monitors!(nodes[0], 1);
2273-
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
22742274

22752275
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]);
22762276
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg);
@@ -2333,6 +2333,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
23332333
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
23342334
let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
23352335
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id);
2336+
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
23362337

23372338
// New outbound messages should be generated immediately upon a call to
23382339
// get_and_clear_pending_msg_events (but not before).
@@ -2631,15 +2632,13 @@ fn double_temp_error() {
26312632
// `claim_funds` results in a ChannelMonitorUpdate.
26322633
nodes[1].node.claim_funds(payment_preimage_1);
26332634
check_added_monitors!(nodes[1], 1);
2634-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
26352635
let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
26362636

26372637
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
26382638
// Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`,
26392639
// which had some asserts that prevented it from being called twice.
26402640
nodes[1].node.claim_funds(payment_preimage_2);
26412641
check_added_monitors!(nodes[1], 1);
2642-
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
26432642
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
26442643

26452644
let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
@@ -2648,11 +2647,24 @@ fn double_temp_error() {
26482647
check_added_monitors!(nodes[1], 0);
26492648
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2);
26502649

2651-
// Complete the first HTLC.
2652-
let events = nodes[1].node.get_and_clear_pending_msg_events();
2653-
assert_eq!(events.len(), 1);
2650+
// Complete the first HTLC. Note that as a side-effect we handle the monitor update completions
2651+
// and get both PaymentClaimed events at once.
2652+
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
2653+
2654+
let events = nodes[1].node.get_and_clear_pending_events();
2655+
assert_eq!(events.len(), 2);
2656+
match events[0] {
2657+
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1),
2658+
_ => panic!("Unexpected Event: {:?}", events[0]),
2659+
}
2660+
match events[1] {
2661+
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2),
2662+
_ => panic!("Unexpected Event: {:?}", events[1]),
2663+
}
2664+
2665+
assert_eq!(msg_events.len(), 1);
26542666
let (update_fulfill_1, commitment_signed_b1, node_id) = {
2655-
match &events[0] {
2667+
match &msg_events[0] {
26562668
&MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
26572669
assert!(update_add_htlcs.is_empty());
26582670
assert_eq!(update_fulfill_htlcs.len(), 1);

lightning/src/ln/channel.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -391,18 +391,15 @@ enum UpdateFulfillFetch {
391391
}
392392

393393
/// The return type of get_update_fulfill_htlc_and_commit.
394-
pub enum UpdateFulfillCommitFetch {
394+
pub enum UpdateFulfillCommitFetch<'a> {
395395
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
396396
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
397397
/// previously placed in the holding cell (and has since been removed).
398398
NewClaim {
399399
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
400-
monitor_update: ChannelMonitorUpdate,
400+
monitor_update: &'a ChannelMonitorUpdate,
401401
/// The value of the HTLC which was claimed, in msat.
402402
htlc_value_msat: u64,
403-
/// The update_fulfill message and commitment_signed message (if the claim was not placed
404-
/// in the holding cell).
405-
msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
406403
},
407404
/// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell
408405
/// or has been forgotten (presumably previously claimed).
@@ -1922,22 +1919,30 @@ impl<Signer: Sign> Channel<Signer> {
19221919
}
19231920
}
19241921

1925-
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
1922+
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
19261923
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
1927-
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
1928-
let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
1929-
Err(e) => return Err((e, monitor_update)),
1930-
Ok(res) => res
1931-
};
1932-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
1924+
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
1925+
let mut additional_update = self.build_commitment_no_status_check(logger);
1926+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
19331927
// strictly increasing by one, so decrement it here.
19341928
self.latest_monitor_update_id = monitor_update.update_id;
19351929
monitor_update.updates.append(&mut additional_update.updates);
1936-
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
1930+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
1931+
self.pending_monitor_updates.push(monitor_update);
1932+
UpdateFulfillCommitFetch::NewClaim {
1933+
monitor_update: self.pending_monitor_updates.last().unwrap(),
1934+
htlc_value_msat,
1935+
}
19371936
},
1938-
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
1939-
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
1940-
UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
1937+
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
1938+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
1939+
self.pending_monitor_updates.push(monitor_update);
1940+
UpdateFulfillCommitFetch::NewClaim {
1941+
monitor_update: self.pending_monitor_updates.last().unwrap(),
1942+
htlc_value_msat,
1943+
}
1944+
}
1945+
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
19411946
}
19421947
}
19431948

lightning/src/ln/channelmanager.rs

Lines changed: 29 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4344,60 +4344,35 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
43444344
let chan_id = prev_hop.outpoint.to_channel_id();
43454345
let channel_state = &mut *channel_state_lock;
43464346
if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
4347-
match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
4348-
Ok(msgs_monitor_option) => {
4349-
if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
4350-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
4351-
ChannelMonitorUpdateStatus::Completed => {},
4352-
e => {
4353-
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
4354-
"Failed to update channel monitor with preimage {:?}: {:?}",
4355-
payment_preimage, e);
4356-
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
4357-
return Err((
4358-
chan.get().get_counterparty_node_id(),
4359-
handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err(),
4360-
));
4361-
}
4362-
}
4363-
if let Some((msg, commitment_signed)) = msgs {
4364-
log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
4365-
log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
4366-
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
4367-
node_id: chan.get().get_counterparty_node_id(),
4368-
updates: msgs::CommitmentUpdate {
4369-
update_add_htlcs: Vec::new(),
4370-
update_fulfill_htlcs: vec![msg],
4371-
update_fail_htlcs: Vec::new(),
4372-
update_fail_malformed_htlcs: Vec::new(),
4373-
update_fee: None,
4374-
commitment_signed,
4375-
}
4376-
});
4377-
}
4378-
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
4379-
Ok(())
4380-
} else {
4381-
Ok(())
4382-
}
4383-
},
4384-
Err((e, monitor_update)) => {
4385-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
4386-
ChannelMonitorUpdateStatus::Completed => {},
4387-
e => {
4388-
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
4389-
"Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
4390-
payment_preimage, e);
4391-
},
4392-
}
4393-
let counterparty_node_id = chan.get().get_counterparty_node_id();
4394-
let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
4395-
if drop {
4396-
chan.remove_entry();
4397-
}
4398-
self.handle_monitor_update_completion_actions(completion_action(None));
4399-
Err((counterparty_node_id, res))
4400-
},
4347+
let counterparty_node_id = chan.get().get_counterparty_node_id();
4348+
let push_action = |htlc_value_msat| {
4349+
if let Some(action) = completion_action(htlc_value_msat) {
4350+
let per_peer_state = self.per_peer_state.read().unwrap();
4351+
let mut peer_state = per_peer_state.get(&counterparty_node_id)
4352+
.expect("XXX: This may be reachable today, I believe, but once we move the channel storage to per_peer_state it won't be.")
4353+
.lock().unwrap();
4354+
log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
4355+
log_bytes!(chan_id), action);
4356+
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
4357+
}
4358+
};
4359+
4360+
let monitor_option = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
4361+
if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = monitor_option {
4362+
push_action(Some(htlc_value_msat));
4363+
let update_id = monitor_update.update_id;
4364+
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
4365+
let res = handle_new_monitor_update!(self, update_res, update_id, channel_state_lock, channel_state.pending_msg_events, chan);
4366+
if let Err(e) = res {
4367+
// TODO: This is a *critical* error - we probably updated some other
4368+
// monitors with a preimage. We should retry this monitor udpate over
4369+
// and over again until morale improves.
4370+
log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
4371+
return Err((counterparty_node_id, e));
4372+
}
4373+
Ok(())
4374+
} else {
4375+
Ok(())
44014376
}
44024377
} else {
44034378
let preimage_update = ChannelMonitorUpdate {
@@ -7229,8 +7204,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelMana
72297204
// LDK versions prior to 0.0.113 do not know how to read the pending claimed payments
72307205
// map. Thus, if there are no entries we skip writing a TLV for it.
72317206
pending_claimed_payments = None;
7232-
} else {
7233-
debug_assert!(false, "While we have code to serialize pending_claimed_payments, the map should always be empty until a later PR");
72347207
}
72357208

72367209
write_tlv_fields!(writer, {

0 commit comments

Comments
 (0)