Skip to content

Commit cf5ecd5

Browse files
committed
Always process holding cell ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline when freeing the holding cell, including when initiating shutdown.
1 parent 5fcaf46 commit cf5ecd5

File tree

3 files changed

+78
-83
lines changed

3 files changed

+78
-83
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2600,7 +2600,15 @@ fn test_permanent_error_during_sending_shutdown() {
26002600
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
26012601

26022602
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
2603-
check_closed_broadcast!(nodes[0], true);
2603+
2604+
// We always send the `shutdown` response when initiating a shutdown, even if we immediately
2605+
// close the channel thereafter.
2606+
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
2607+
assert_eq!(msg_events.len(), 3);
2608+
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
2609+
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
2610+
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }
2611+
26042612
check_added_monitors!(nodes[0], 2);
26052613
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
26062614
}

lightning/src/ln/channel.rs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3234,16 +3234,16 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
32343234
/// Public version of the below, checking relevant preconditions first.
32353235
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
32363236
/// returns `(None, Vec::new())`.
3237-
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3237+
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
32383238
if self.channel_state >= ChannelState::ChannelReady as u32 &&
32393239
(self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
32403240
self.free_holding_cell_htlcs(logger)
3241-
} else { Ok((None, Vec::new())) }
3241+
} else { (None, Vec::new()) }
32423242
}
32433243

32443244
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
32453245
/// for our counterparty.
3246-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3246+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
32473247
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
32483248
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
32493249
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
@@ -3324,16 +3324,16 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33243324
}
33253325
}
33263326
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
3327-
return Ok((None, htlcs_to_fail));
3327+
return (None, htlcs_to_fail);
33283328
}
33293329
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
33303330
self.send_update_fee(feerate, false, logger)
33313331
} else {
33323332
None
33333333
};
33343334

3335-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3336-
// send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
3335+
let mut additional_update = self.build_commitment_no_status_check(logger);
3336+
// build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
33373337
// but we want them to be strictly increasing by one, so reset it here.
33383338
self.latest_monitor_update_id = monitor_update.update_id;
33393339
monitor_update.updates.append(&mut additional_update.updates);
@@ -3342,16 +3342,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33423342
log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
33433343
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
33443344

3345-
Ok((Some((msgs::CommitmentUpdate {
3346-
update_add_htlcs,
3347-
update_fulfill_htlcs,
3348-
update_fail_htlcs,
3349-
update_fail_malformed_htlcs: Vec::new(),
3350-
update_fee,
3351-
commitment_signed,
3352-
}, monitor_update)), htlcs_to_fail))
3345+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
3346+
self.pending_monitor_updates.push(monitor_update);
3347+
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
33533348
} else {
3354-
Ok((None, Vec::new()))
3349+
(None, Vec::new())
33553350
}
33563351
}
33573352

@@ -3561,17 +3556,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
35613556
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
35623557
}
35633558

3564-
match self.free_holding_cell_htlcs(logger)? {
3565-
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
3566-
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
3567-
for fail_msg in update_fail_htlcs.drain(..) {
3568-
commitment_update.update_fail_htlcs.push(fail_msg);
3569-
}
3570-
commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
3571-
for fail_msg in update_fail_malformed_htlcs.drain(..) {
3572-
commitment_update.update_fail_malformed_htlcs.push(fail_msg);
3573-
}
3574-
3559+
match self.free_holding_cell_htlcs(logger) {
3560+
(Some(_), htlcs_to_fail) => {
3561+
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
35753562
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
35763563
// strictly increasing by one, so decrement it here.
35773564
self.latest_monitor_update_id = monitor_update.update_id;
@@ -5948,8 +5935,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
59485935

59495936
/// Begins the shutdown process, getting a message for the remote peer and returning all
59505937
/// holding cell HTLCs for payment failure.
5951-
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
5952-
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
5938+
///
5939+
/// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
5940+
/// [`ChannelMonitorUpdate`] will be returned).
5941+
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
5942+
target_feerate_sats_per_kw: Option<u32>)
5943+
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
59535944
where SP::Target: SignerProvider {
59545945
for htlc in self.pending_outbound_htlcs.iter() {
59555946
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5992,12 +5983,15 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
59925983

59935984
let monitor_update = if update_shutdown_script {
59945985
self.latest_monitor_update_id += 1;
5995-
Some(ChannelMonitorUpdate {
5986+
let monitor_update = ChannelMonitorUpdate {
59965987
update_id: self.latest_monitor_update_id,
59975988
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
59985989
scriptpubkey: self.get_closing_scriptpubkey(),
59995990
}],
6000-
})
5991+
};
5992+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
5993+
self.pending_monitor_updates.push(monitor_update);
5994+
Some(self.pending_monitor_updates.last().unwrap())
60015995
} else { None };
60025996
let shutdown = msgs::Shutdown {
60035997
channel_id: self.channel_id,
@@ -6018,6 +6012,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
60186012
}
60196013
});
60206014

6015+
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
6016+
"we can't both complete shutdown and return a monitor update");
6017+
60216018
Ok((shutdown, monitor_update, dropped_outbound_htlcs))
60226019
}
60236020

lightning/src/ln/channelmanager.rs

Lines changed: 42 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1855,25 +1855,27 @@ where
18551855
let peer_state = &mut *peer_state_lock;
18561856
match peer_state.channel_by_id.entry(channel_id.clone()) {
18571857
hash_map::Entry::Occupied(mut chan_entry) => {
1858-
let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?;
1858+
let funding_txo_opt = chan_entry.get().get_funding_txo();
1859+
let their_features = &peer_state.latest_features;
1860+
let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
1861+
.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?;
18591862
failed_htlcs = htlcs;
18601863

1861-
// Update the monitor with the shutdown script if necessary.
1862-
if let Some(monitor_update) = monitor_update {
1863-
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
1864-
let (result, is_permanent) =
1865-
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
1866-
if is_permanent {
1867-
remove_channel!(self, chan_entry);
1868-
break result;
1869-
}
1870-
}
1871-
1864+
// We can send the `shutdown` message before updating the `ChannelMonitor`
1865+
// here as we don't need the monitor update to complete until we send a
1866+
// `shutdown_signed`, which we'll delay if we're pending a monitor update.
18721867
peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
18731868
node_id: *counterparty_node_id,
1874-
msg: shutdown_msg
1869+
msg: shutdown_msg,
18751870
});
18761871

1872+
// Update the monitor with the shutdown script if necessary.
1873+
if let Some(monitor_update) = monitor_update_opt.take() {
1874+
let update_id = monitor_update.update_id;
1875+
let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
1876+
break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
1877+
}
1878+
18771879
if chan_entry.get().is_shutdown() {
18781880
let channel = remove_channel!(self, chan_entry);
18791881
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
@@ -5252,49 +5254,37 @@ where
52525254
let mut has_monitor_update = false;
52535255
let mut failed_htlcs = Vec::new();
52545256
let mut handle_errors = Vec::new();
5255-
{
5256-
let per_peer_state = self.per_peer_state.read().unwrap();
5257+
let per_peer_state = self.per_peer_state.read().unwrap();
52575258

5258-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5259+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5260+
'chan_loop: loop {
52595261
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
5260-
let peer_state = &mut *peer_state_lock;
5261-
let pending_msg_events = &mut peer_state.pending_msg_events;
5262-
peer_state.channel_by_id.retain(|channel_id, chan| {
5263-
match chan.maybe_free_holding_cell_htlcs(&self.logger) {
5264-
Ok((commitment_opt, holding_cell_failed_htlcs)) => {
5265-
if !holding_cell_failed_htlcs.is_empty() {
5266-
failed_htlcs.push((
5267-
holding_cell_failed_htlcs,
5268-
*channel_id,
5269-
chan.get_counterparty_node_id()
5270-
));
5271-
}
5272-
if let Some((commitment_update, monitor_update)) = commitment_opt {
5273-
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
5274-
ChannelMonitorUpdateStatus::Completed => {
5275-
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
5276-
node_id: chan.get_counterparty_node_id(),
5277-
updates: commitment_update,
5278-
});
5279-
},
5280-
e => {
5281-
has_monitor_update = true;
5282-
let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
5283-
handle_errors.push((chan.get_counterparty_node_id(), res));
5284-
if close_channel { return false; }
5285-
},
5286-
}
5287-
}
5288-
true
5289-
},
5290-
Err(e) => {
5291-
let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
5292-
handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
5293-
// ChannelClosed event is generated by handle_error for us
5294-
!close_channel
5262+
let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
5263+
for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
5264+
let counterparty_node_id = chan.get_counterparty_node_id();
5265+
let funding_txo = chan.get_funding_txo();
5266+
let (monitor_opt, holding_cell_failed_htlcs) =
5267+
chan.maybe_free_holding_cell_htlcs(&self.logger);
5268+
if !holding_cell_failed_htlcs.is_empty() {
5269+
failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
5270+
}
5271+
if let Some(monitor_update) = monitor_opt {
5272+
has_monitor_update = true;
5273+
5274+
let update_res = self.chain_monitor.update_channel(
5275+
funding_txo.expect("channel is live"), monitor_update);
5276+
let update_id = monitor_update.update_id;
5277+
let channel_id: [u8; 32] = *channel_id;
5278+
let res = handle_new_monitor_update!(self, update_res, update_id,
5279+
peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
5280+
peer_state.channel_by_id.remove(&channel_id));
5281+
if res.is_err() {
5282+
handle_errors.push((counterparty_node_id, res));
52955283
}
5284+
continue 'chan_loop;
52965285
}
5297-
});
5286+
}
5287+
break 'chan_loop;
52985288
}
52995289
}
53005290

0 commit comments

Comments
 (0)