Skip to content

Commit 197d9a6

Browse files
committed
Always process holding cell ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline when freeing the holding cell, including when initiating shutdown.
1 parent c187564 commit 197d9a6

File tree

3 files changed

+71
-80
lines changed

3 files changed

+71
-80
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2587,7 +2587,15 @@ fn test_permanent_error_during_sending_shutdown() {
25872587
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
25882588

25892589
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
2590-
check_closed_broadcast!(nodes[0], true);
2590+
2591+
// We always send the `shutdown` response when initiating a shutdown, even if we immediately
2592+
// close the channel thereafter.
2593+
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
2594+
assert_eq!(msg_events.len(), 3);
2595+
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
2596+
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
2597+
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }
2598+
25912599
check_added_monitors!(nodes[0], 2);
25922600
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
25932601
}

lightning/src/ln/channel.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3161,16 +3161,16 @@ impl<Signer: Sign> Channel<Signer> {
31613161
/// Public version of the below, checking relevant preconditions first.
31623162
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
31633163
/// returns `(None, Vec::new())`.
3164-
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3164+
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
31653165
if self.channel_state >= ChannelState::ChannelReady as u32 &&
31663166
(self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
31673167
self.free_holding_cell_htlcs(logger)
3168-
} else { Ok((None, Vec::new())) }
3168+
} else { (None, Vec::new()) }
31693169
}
31703170

31713171
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
31723172
/// for our counterparty.
3173-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3173+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
31743174
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
31753175
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
31763176
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
@@ -3251,16 +3251,16 @@ impl<Signer: Sign> Channel<Signer> {
32513251
}
32523252
}
32533253
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
3254-
return Ok((None, htlcs_to_fail));
3254+
return (None, htlcs_to_fail);
32553255
}
32563256
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
32573257
self.send_update_fee(feerate, false, logger)
32583258
} else {
32593259
None
32603260
};
32613261

3262-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3263-
// send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
3262+
let mut additional_update = self.build_commitment_no_status_check(logger);
3263+
// build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
32643264
// but we want them to be strictly increasing by one, so reset it here.
32653265
self.latest_monitor_update_id = monitor_update.update_id;
32663266
monitor_update.updates.append(&mut additional_update.updates);
@@ -3269,16 +3269,11 @@ impl<Signer: Sign> Channel<Signer> {
32693269
log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
32703270
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
32713271

3272-
Ok((Some((msgs::CommitmentUpdate {
3273-
update_add_htlcs,
3274-
update_fulfill_htlcs,
3275-
update_fail_htlcs,
3276-
update_fail_malformed_htlcs: Vec::new(),
3277-
update_fee,
3278-
commitment_signed,
3279-
}, monitor_update)), htlcs_to_fail))
3272+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
3273+
self.pending_monitor_updates.push(monitor_update);
3274+
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
32803275
} else {
3281-
Ok((None, Vec::new()))
3276+
(None, Vec::new())
32823277
}
32833278
}
32843279

@@ -3488,17 +3483,9 @@ impl<Signer: Sign> Channel<Signer> {
34883483
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
34893484
}
34903485

3491-
match self.free_holding_cell_htlcs(logger)? {
3492-
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
3493-
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
3494-
for fail_msg in update_fail_htlcs.drain(..) {
3495-
commitment_update.update_fail_htlcs.push(fail_msg);
3496-
}
3497-
commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
3498-
for fail_msg in update_fail_malformed_htlcs.drain(..) {
3499-
commitment_update.update_fail_malformed_htlcs.push(fail_msg);
3500-
}
3501-
3486+
match self.free_holding_cell_htlcs(logger) {
3487+
(Some(_), htlcs_to_fail) => {
3488+
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
35023489
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
35033490
// strictly increasing by one, so decrement it here.
35043491
self.latest_monitor_update_id = monitor_update.update_id;
@@ -5815,8 +5802,11 @@ impl<Signer: Sign> Channel<Signer> {
58155802

58165803
/// Begins the shutdown process, getting a message for the remote peer and returning all
58175804
/// holding cell HTLCs for payment failure.
5805+
///
5806+
/// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
5807+
/// [`ChannelMonitorUpdate`] will be returned).
58185808
pub fn get_shutdown<K: Deref>(&mut self, keys_provider: &K, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
5819-
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
5809+
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
58205810
where K::Target: KeysInterface {
58215811
for htlc in self.pending_outbound_htlcs.iter() {
58225812
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5859,12 +5849,15 @@ impl<Signer: Sign> Channel<Signer> {
58595849

58605850
let monitor_update = if update_shutdown_script {
58615851
self.latest_monitor_update_id += 1;
5862-
Some(ChannelMonitorUpdate {
5852+
let monitor_update = ChannelMonitorUpdate {
58635853
update_id: self.latest_monitor_update_id,
58645854
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
58655855
scriptpubkey: self.get_closing_scriptpubkey(),
58665856
}],
5867-
})
5857+
};
5858+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
5859+
self.pending_monitor_updates.push(monitor_update);
5860+
Some(self.pending_monitor_updates.last().unwrap())
58685861
} else { None };
58695862
let shutdown = msgs::Shutdown {
58705863
channel_id: self.channel_id,
@@ -5885,6 +5878,9 @@ impl<Signer: Sign> Channel<Signer> {
58855878
}
58865879
});
58875880

5881+
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
5882+
"we can't both complete shutdown and return a monitor update");
5883+
58885884
Ok((shutdown, monitor_update, dropped_outbound_htlcs))
58895885
}
58905886

lightning/src/ln/channelmanager.rs

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1947,7 +1947,8 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
19471947
if *counterparty_node_id != chan_entry.get().get_counterparty_node_id(){
19481948
return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() });
19491949
}
1950-
let (shutdown_msg, monitor_update, htlcs) = {
1950+
let funding_txo_opt = chan_entry.get().get_funding_txo();
1951+
let (shutdown_msg, mut monitor_update_opt, htlcs) = {
19511952
let per_peer_state = self.per_peer_state.read().unwrap();
19521953
match per_peer_state.get(&counterparty_node_id) {
19531954
Some(peer_state) => {
@@ -1960,22 +1961,21 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
19601961
};
19611962
failed_htlcs = htlcs;
19621963

1963-
// Update the monitor with the shutdown script if necessary.
1964-
if let Some(monitor_update) = monitor_update {
1965-
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
1966-
let (result, is_permanent) =
1967-
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
1968-
if is_permanent {
1969-
remove_channel!(self, chan_entry);
1970-
break result;
1971-
}
1972-
}
1973-
1964+
// We can send the `shutdown` message before updating the `ChannelMonitor`
1965+
// here as we don't need the monitor update to complete until we send a
1966+
// `shutdown_signed`, which we'll delay if we're pending a monitor update.
19741967
channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
19751968
node_id: *counterparty_node_id,
1976-
msg: shutdown_msg
1969+
msg: shutdown_msg,
19771970
});
19781971

1972+
// Update the monitor with the shutdown script if necessary.
1973+
if let Some(monitor_update) = monitor_update_opt.take() {
1974+
let update_id = monitor_update.update_id;
1975+
let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
1976+
break handle_new_monitor_update!(self, update_res, update_id, channel_state_lock, channel_state.pending_msg_events, chan_entry);
1977+
}
1978+
19791979
if chan_entry.get().is_shutdown() {
19801980
let channel = remove_channel!(self, chan_entry);
19811981
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
@@ -5527,48 +5527,35 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
55275527
let mut has_monitor_update = false;
55285528
let mut failed_htlcs = Vec::new();
55295529
let mut handle_errors = Vec::new();
5530-
{
5530+
'outer_loop: loop {
55315531
let mut channel_state_lock = self.channel_state.lock().unwrap();
55325532
let channel_state = &mut *channel_state_lock;
5533-
let by_id = &mut channel_state.by_id;
5534-
let pending_msg_events = &mut channel_state.pending_msg_events;
55355533

5536-
by_id.retain(|channel_id, chan| {
5537-
match chan.maybe_free_holding_cell_htlcs(&self.logger) {
5538-
Ok((commitment_opt, holding_cell_failed_htlcs)) => {
5539-
if !holding_cell_failed_htlcs.is_empty() {
5540-
failed_htlcs.push((
5541-
holding_cell_failed_htlcs,
5542-
*channel_id,
5543-
chan.get_counterparty_node_id()
5544-
));
5545-
}
5546-
if let Some((commitment_update, monitor_update)) = commitment_opt {
5547-
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
5548-
ChannelMonitorUpdateStatus::Completed => {
5549-
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
5550-
node_id: chan.get_counterparty_node_id(),
5551-
updates: commitment_update,
5552-
});
5553-
},
5554-
e => {
5555-
has_monitor_update = true;
5556-
let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
5557-
handle_errors.push((chan.get_counterparty_node_id(), res));
5558-
if close_channel { return false; }
5559-
},
5560-
}
5561-
}
5562-
true
5563-
},
5564-
Err(e) => {
5565-
let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
5566-
handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
5567-
// ChannelClosed event is generated by handle_error for us
5568-
!close_channel
5534+
for (channel_id, chan) in channel_state.by_id.iter_mut() {
5535+
let counterparty_node_id = chan.get_counterparty_node_id();
5536+
let funding_txo = chan.get_funding_txo();
5537+
let (monitor_opt, holding_cell_failed_htlcs) =
5538+
chan.maybe_free_holding_cell_htlcs(&self.logger);
5539+
if !holding_cell_failed_htlcs.is_empty() {
5540+
failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
5541+
}
5542+
if let Some(monitor_update) = monitor_opt {
5543+
has_monitor_update = true;
5544+
5545+
let update_res = self.chain_monitor.update_channel(
5546+
funding_txo.expect("channel is live"), monitor_update);
5547+
let update_id = monitor_update.update_id;
5548+
let channel_id: [u8; 32] = *channel_id;
5549+
let res = handle_new_monitor_update!(self, update_res, update_id,
5550+
channel_state, channel_state.pending_msg_events, chan, MANUALLY_REMOVING,
5551+
channel_state.by_id.remove(&channel_id));
5552+
if res.is_err() {
5553+
handle_errors.push((counterparty_node_id, res));
55695554
}
5555+
continue 'outer_loop;
55705556
}
5571-
});
5557+
}
5558+
break 'outer_loop;
55725559
}
55735560

55745561
let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();

0 commit comments

Comments
 (0)