Skip to content

Commit 12dd28b

Browse files
Always remove disconnected peers with no channels
When a peer disconnects but still has channels, the peer's `peer_state` entry in the `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of to that peer is later closed while still being disconnected (i.e. force closed), we therefore need to remove the peer from `peer_state` separately. To remove the peers separately, we push such peers to a separate HashSet that holds peers awaiting removal, and remove the peers on a timer to limit the negative effects on paralleism as much as possible.
1 parent c1d7c92 commit 12dd28b

File tree

1 file changed

+112
-16
lines changed

1 file changed

+112
-16
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 112 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,8 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
620620
// | |
621621
// | |__`best_block`
622622
// | |
623+
// | |__`pending_peers_awaiting_removal`
624+
// | |
623625
// | |__`pending_events`
624626
// | |
625627
// | |__`pending_background_events`
@@ -787,6 +789,16 @@ where
787789

788790
/// See `ChannelManager` struct-level documentation for lock order requirements.
789791
pending_events: Mutex<Vec<events::Event>>,
792+
/// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
793+
/// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
794+
/// to that peer is later closed while still being disconnected (i.e. force closed), we
795+
/// therefore need to remove the peer from `peer_state` separately.
796+
/// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
797+
/// instead store such peers awaiting removal in this field, and remove them on a timer to
798+
/// limit the negative effects on parallelism as much as possible.
799+
///
800+
/// See `ChannelManager` struct-level documentation for lock order requirements.
801+
pending_peers_awaiting_removal: Mutex<HashSet<PublicKey>>,
790802
/// See `ChannelManager` struct-level documentation for lock order requirements.
791803
pending_background_events: Mutex<Vec<BackgroundEvent>>,
792804
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1347,10 +1359,11 @@ macro_rules! try_chan_entry {
13471359
}
13481360

13491361
macro_rules! remove_channel {
1350-
($self: expr, $entry: expr) => {
1362+
($self: expr, $entry: expr, $peer_state: expr) => {
13511363
{
13521364
let channel = $entry.remove_entry().1;
13531365
update_maps_on_chan_removal!($self, channel);
1366+
$self.add_pending_peer_to_be_removed(channel.get_counterparty_node_id(), $peer_state);
13541367
channel
13551368
}
13561369
}
@@ -1523,6 +1536,7 @@ where
15231536
per_peer_state: FairRwLock::new(HashMap::new()),
15241537

15251538
pending_events: Mutex::new(Vec::new()),
1539+
pending_peers_awaiting_removal: Mutex::new(HashSet::new()),
15261540
pending_background_events: Mutex::new(Vec::new()),
15271541
total_consistency_lock: RwLock::new(()),
15281542
persistence_notifier: Notifier::new(),
@@ -1789,7 +1803,7 @@ where
17891803
let (result, is_permanent) =
17901804
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
17911805
if is_permanent {
1792-
remove_channel!(self, chan_entry);
1806+
remove_channel!(self, chan_entry, peer_state);
17931807
break result;
17941808
}
17951809
}
@@ -1800,7 +1814,7 @@ where
18001814
});
18011815

18021816
if chan_entry.get().is_shutdown() {
1803-
let channel = remove_channel!(self, chan_entry);
1817+
let channel = remove_channel!(self, chan_entry, peer_state);
18041818
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
18051819
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
18061820
msg: channel_update
@@ -1903,7 +1917,7 @@ where
19031917
} else {
19041918
self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
19051919
}
1906-
remove_channel!(self, chan)
1920+
remove_channel!(self, chan, peer_state)
19071921
} else {
19081922
return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) });
19091923
}
@@ -1942,6 +1956,13 @@ where
19421956
}
19431957
}
19441958

1959+
fn add_pending_peer_to_be_removed(&self, counterparty_node_id: PublicKey, peer_state: &mut PeerState<<SP::Target as SignerProvider>::Signer>) {
1960+
let peer_should_be_removed = !peer_state.is_connected && peer_state.channel_by_id.len() == 0;
1961+
if peer_should_be_removed {
1962+
self.pending_peers_awaiting_removal.lock().unwrap().insert(counterparty_node_id);
1963+
}
1964+
}
1965+
19451966
/// Force closes a channel, immediately broadcasting the latest local transaction(s) and
19461967
/// rejecting new HTLCs on the given channel. Fails if `channel_id` is unknown to
19471968
/// the manager, or if the `counterparty_node_id` isn't the counterparty of the corresponding
@@ -3421,6 +3442,34 @@ where
34213442
true
34223443
}
34233444

3445+
/// Removes peers which have been been added to `pending_peers_awaiting_removal` which are
3446+
/// still disconnected and we have no channels to.
3447+
///
3448+
/// Must be called without the `per_peer_state` lock acquired.
3449+
fn remove_peers_awaiting_removal(&self) {
3450+
let mut pending_peers_awaiting_removal = HashSet::new();
3451+
mem::swap(&mut *self.pending_peers_awaiting_removal.lock().unwrap(), &mut pending_peers_awaiting_removal);
3452+
if pending_peers_awaiting_removal.len() > 0 {
3453+
let mut per_peer_state = self.per_peer_state.write().unwrap();
3454+
for counterparty_node_id in pending_peers_awaiting_removal.drain() {
3455+
match per_peer_state.entry(counterparty_node_id) {
3456+
hash_map::Entry::Occupied(entry) => {
3457+
// Remove the entry if the peer is still disconnected and we still
3458+
// have no channels to the peer.
3459+
let remove_entry = {
3460+
let peer_state = entry.get().lock().unwrap();
3461+
!peer_state.is_connected && peer_state.channel_by_id.len() == 0
3462+
};
3463+
if remove_entry {
3464+
entry.remove_entry();
3465+
}
3466+
},
3467+
hash_map::Entry::Vacant(_) => { /* The PeerState has already been removed */ }
3468+
}
3469+
}
3470+
}
3471+
}
3472+
34243473
#[cfg(any(test, feature = "_test_utils"))]
34253474
/// Process background events, for functional testing
34263475
pub fn test_process_background_events(&self) {
@@ -3499,13 +3548,14 @@ where
34993548
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
35003549
let peer_state = &mut *peer_state_lock;
35013550
let pending_msg_events = &mut peer_state.pending_msg_events;
3551+
let counterparty_node_id = *counterparty_node_id;
35023552
peer_state.channel_by_id.retain(|chan_id, chan| {
35033553
let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
35043554
if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
35053555

35063556
if let Err(e) = chan.timer_check_closing_negotiation_progress() {
35073557
let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id);
3508-
handle_errors.push((Err(err), *counterparty_node_id));
3558+
handle_errors.push((Err(err), counterparty_node_id));
35093559
if needs_close { return false; }
35103560
}
35113561

@@ -3539,8 +3589,10 @@ where
35393589

35403590
true
35413591
});
3592+
self.add_pending_peer_to_be_removed(counterparty_node_id, peer_state);
35423593
}
35433594
}
3595+
self.remove_peers_awaiting_removal();
35443596

35453597
self.claimable_payments.lock().unwrap().claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
35463598
if htlcs.is_empty() {
@@ -4295,7 +4347,7 @@ where
42954347
}
42964348
};
42974349
peer_state.pending_msg_events.push(send_msg_err_event);
4298-
let _ = remove_channel!(self, channel);
4350+
let _ = remove_channel!(self, channel, peer_state);
42994351
return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() });
43004352
}
43014353

@@ -4581,7 +4633,7 @@ where
45814633
let (result, is_permanent) =
45824634
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
45834635
if is_permanent {
4584-
remove_channel!(self, chan_entry);
4636+
remove_channel!(self, chan_entry, peer_state);
45854637
break result;
45864638
}
45874639
}
@@ -4630,7 +4682,7 @@ where
46304682
// also implies there are no pending HTLCs left on the channel, so we can
46314683
// fully delete it from tracking (the channel monitor is still around to
46324684
// watch for old state broadcasts)!
4633-
(tx, Some(remove_channel!(self, chan_entry)))
4685+
(tx, Some(remove_channel!(self, chan_entry, peer_state)))
46344686
} else { (tx, None) }
46354687
},
46364688
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -5133,12 +5185,11 @@ where
51335185
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
51345186
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
51355187
let peer_state = &mut *peer_state_lock;
5136-
let pending_msg_events = &mut peer_state.pending_msg_events;
51375188
if let hash_map::Entry::Occupied(chan_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) {
5138-
let mut chan = remove_channel!(self, chan_entry);
5189+
let mut chan = remove_channel!(self, chan_entry, peer_state);
51395190
failed_channels.push(chan.force_shutdown(false));
51405191
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
5141-
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
5192+
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
51425193
msg: update
51435194
});
51445195
}
@@ -5148,7 +5199,7 @@ where
51485199
ClosureReason::CommitmentTxConfirmed
51495200
};
51505201
self.issue_channel_close_events(&chan, reason);
5151-
pending_msg_events.push(events::MessageSendEvent::HandleError {
5202+
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
51525203
node_id: chan.get_counterparty_node_id(),
51535204
action: msgs::ErrorAction::SendErrorMessage {
51545205
msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
@@ -5190,7 +5241,7 @@ where
51905241
{
51915242
let per_peer_state = self.per_peer_state.read().unwrap();
51925243

5193-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5244+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
51945245
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
51955246
let peer_state = &mut *peer_state_lock;
51965247
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5230,6 +5281,7 @@ where
52305281
}
52315282
}
52325283
});
5284+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
52335285
}
52345286
}
52355287

@@ -5254,7 +5306,7 @@ where
52545306
{
52555307
let per_peer_state = self.per_peer_state.read().unwrap();
52565308

5257-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5309+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
52585310
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
52595311
let peer_state = &mut *peer_state_lock;
52605312
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5292,6 +5344,7 @@ where
52925344
}
52935345
}
52945346
});
5347+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
52955348
}
52965349
}
52975350

@@ -5873,7 +5926,7 @@ where
58735926
let mut timed_out_htlcs = Vec::new();
58745927
{
58755928
let per_peer_state = self.per_peer_state.read().unwrap();
5876-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5929+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
58775930
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
58785931
let peer_state = &mut *peer_state_lock;
58795932
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5957,6 +6010,7 @@ where
59576010
}
59586011
true
59596012
});
6013+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
59606014
}
59616015
}
59626016

@@ -6284,7 +6338,7 @@ where
62846338

62856339
let per_peer_state = self.per_peer_state.read().unwrap();
62866340

6287-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
6341+
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
62886342
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
62896343
let peer_state = &mut *peer_state_lock;
62906344
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -6316,6 +6370,7 @@ where
63166370
}
63176371
retain
63186372
});
6373+
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
63196374
}
63206375
//TODO: Also re-broadcast announcement_signatures
63216376
Ok(())
@@ -6829,6 +6884,8 @@ where
68296884

68306885
write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
68316886

6887+
self.remove_peers_awaiting_removal();
6888+
68326889
self.genesis_hash.write(writer)?;
68336890
{
68346891
let best_block = self.best_block.read().unwrap();
@@ -7654,6 +7711,7 @@ where
76547711
per_peer_state: FairRwLock::new(per_peer_state),
76557712

76567713
pending_events: Mutex::new(pending_events_read),
7714+
pending_peers_awaiting_removal: Mutex::new(HashSet::new()),
76577715
pending_background_events: Mutex::new(pending_background_events_read),
76587716
total_consistency_lock: RwLock::new(()),
76597717
persistence_notifier: Notifier::new(),
@@ -8117,6 +8175,44 @@ mod tests {
81178175
}
81188176
}
81198177

8178+
#[test]
8179+
fn test_drop_disconnected_peers_when_removing_channels() {
8180+
let chanmon_cfgs = create_chanmon_cfgs(2);
8181+
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
8182+
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
8183+
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
8184+
8185+
let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
8186+
8187+
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
8188+
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
8189+
8190+
nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap();
8191+
check_closed_broadcast!(nodes[0], true);
8192+
check_added_monitors!(nodes[0], 1);
8193+
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
8194+
8195+
{
8196+
// Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
8197+
// disconnected and the channel between has been force closed.
8198+
let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
8199+
let nodes_0_pending_peers_awaiting_removal = nodes[0].node.pending_peers_awaiting_removal.lock().unwrap();
8200+
assert_eq!(nodes_0_pending_peers_awaiting_removal.len(), 1);
8201+
assert!(nodes_0_pending_peers_awaiting_removal.get(&nodes[1].node.get_our_node_id()).is_some());
8202+
// Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
8203+
assert_eq!(nodes_0_per_peer_state.len(), 1);
8204+
assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some());
8205+
}
8206+
8207+
nodes[0].node.timer_tick_occurred();
8208+
8209+
{
8210+
// Assert that nodes[1] has now been removed.
8211+
assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0);
8212+
assert_eq!(nodes[0].node.pending_peers_awaiting_removal.lock().unwrap().len(), 0);
8213+
}
8214+
}
8215+
81208216
#[test]
81218217
fn bad_inbound_payment_hash() {
81228218
// Add coverage for checking that a user-provided payment hash matches the payment secret.

0 commit comments

Comments
 (0)