@@ -620,8 +620,6 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
620
620
// | |
621
621
// | |__`best_block`
622
622
// | |
623
- // | |__`pending_peers_awaiting_removal`
624
- // | |
625
623
// | |__`pending_events`
626
624
// | |
627
625
// | |__`pending_background_events`
@@ -789,16 +787,6 @@ where
789
787
790
788
/// See `ChannelManager` struct-level documentation for lock order requirements.
791
789
pending_events : Mutex < Vec < events:: Event > > ,
792
- /// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
793
- /// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
794
- /// to that peer is later closed while still being disconnected (i.e. force closed), we
795
- /// therefore need to remove the peer from `peer_state` separately.
796
- /// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
797
- /// instead store such peers awaiting removal in this field, and remove them on a timer to
798
- /// limit the negative effects on parallelism as much as possible.
799
- ///
800
- /// See `ChannelManager` struct-level documentation for lock order requirements.
801
- pending_peers_awaiting_removal : Mutex < HashSet < PublicKey > > ,
802
790
/// See `ChannelManager` struct-level documentation for lock order requirements.
803
791
pending_background_events : Mutex < Vec < BackgroundEvent > > ,
804
792
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1359,11 +1347,10 @@ macro_rules! try_chan_entry {
1359
1347
}
1360
1348
1361
1349
macro_rules! remove_channel {
1362
- ( $self: expr, $entry: expr, $peer_state : expr ) => {
1350
+ ( $self: expr, $entry: expr) => {
1363
1351
{
1364
1352
let channel = $entry. remove_entry( ) . 1 ;
1365
1353
update_maps_on_chan_removal!( $self, channel) ;
1366
- $self. add_pending_peer_to_be_removed( channel. get_counterparty_node_id( ) , $peer_state) ;
1367
1354
channel
1368
1355
}
1369
1356
}
@@ -1536,7 +1523,6 @@ where
1536
1523
per_peer_state : FairRwLock :: new ( HashMap :: new ( ) ) ,
1537
1524
1538
1525
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
1539
- pending_peers_awaiting_removal : Mutex :: new ( HashSet :: new ( ) ) ,
1540
1526
pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
1541
1527
total_consistency_lock : RwLock :: new ( ( ) ) ,
1542
1528
persistence_notifier : Notifier :: new ( ) ,
@@ -1803,7 +1789,7 @@ where
1803
1789
let ( result, is_permanent) =
1804
1790
handle_monitor_update_res ! ( self , update_res, chan_entry. get_mut( ) , RAACommitmentOrder :: CommitmentFirst , chan_entry. key( ) , NO_UPDATE ) ;
1805
1791
if is_permanent {
1806
- remove_channel ! ( self , chan_entry, peer_state ) ;
1792
+ remove_channel ! ( self , chan_entry) ;
1807
1793
break result;
1808
1794
}
1809
1795
}
@@ -1814,7 +1800,7 @@ where
1814
1800
} ) ;
1815
1801
1816
1802
if chan_entry. get ( ) . is_shutdown ( ) {
1817
- let channel = remove_channel ! ( self , chan_entry, peer_state ) ;
1803
+ let channel = remove_channel ! ( self , chan_entry) ;
1818
1804
if let Ok ( channel_update) = self . get_channel_update_for_broadcast ( & channel) {
1819
1805
peer_state. pending_msg_events . push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
1820
1806
msg : channel_update
@@ -1917,7 +1903,7 @@ where
1917
1903
} else {
1918
1904
self . issue_channel_close_events ( chan. get ( ) , ClosureReason :: HolderForceClosed ) ;
1919
1905
}
1920
- remove_channel ! ( self , chan, peer_state )
1906
+ remove_channel ! ( self , chan)
1921
1907
} else {
1922
1908
return Err ( APIError :: ChannelUnavailable { err : format ! ( "Channel with id {} not found for the passed counterparty node_id {}" , log_bytes!( * channel_id) , peer_node_id) } ) ;
1923
1909
}
@@ -1956,13 +1942,6 @@ where
1956
1942
}
1957
1943
}
1958
1944
1959
- fn add_pending_peer_to_be_removed ( & self , counterparty_node_id : PublicKey , peer_state : & mut PeerState < <SP :: Target as SignerProvider >:: Signer > ) {
1960
- let peer_should_be_removed = !peer_state. is_connected && peer_state. channel_by_id . len ( ) == 0 ;
1961
- if peer_should_be_removed {
1962
- self . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) . insert ( counterparty_node_id) ;
1963
- }
1964
- }
1965
-
1966
1945
/// Force closes a channel, immediately broadcasting the latest local transaction(s) and
1967
1946
/// rejecting new HTLCs on the given channel. Fails if `channel_id` is unknown to
1968
1947
/// the manager, or if the `counterparty_node_id` isn't the counterparty of the corresponding
@@ -3442,16 +3421,20 @@ where
3442
3421
true
3443
3422
}
3444
3423
3445
- /// Removes peers which have been been added to `pending_peers_awaiting_removal` which are
3446
- /// still disconnected and we have no channels to.
3424
+ /// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
3425
+ /// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
3426
+ /// to that peer is later closed while still being disconnected (i.e. force closed), we
3427
+ /// therefore need to remove the peer from `peer_state` separately.
3428
+ /// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
3429
+ /// instead remove such peers awaiting removal through this function, which is called on a
3430
+ /// timer through `timer_tick_occurred`, passing the peers disconnected peers with no channels,
3431
+ /// to limit the negative effects on parallelism as much as possible.
3447
3432
///
3448
3433
/// Must be called without the `per_peer_state` lock acquired.
3449
- fn remove_peers_awaiting_removal ( & self ) {
3450
- let mut pending_peers_awaiting_removal = HashSet :: new ( ) ;
3451
- mem:: swap ( & mut * self . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) , & mut pending_peers_awaiting_removal) ;
3434
+ fn remove_peers_awaiting_removal ( & self , pending_peers_awaiting_removal : HashSet < PublicKey > ) {
3452
3435
if pending_peers_awaiting_removal. len ( ) > 0 {
3453
3436
let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
3454
- for counterparty_node_id in pending_peers_awaiting_removal. drain ( ) {
3437
+ for counterparty_node_id in pending_peers_awaiting_removal {
3455
3438
match per_peer_state. entry ( counterparty_node_id) {
3456
3439
hash_map:: Entry :: Occupied ( entry) => {
3457
3440
// Remove the entry if the peer is still disconnected and we still
@@ -3530,6 +3513,7 @@ where
3530
3513
/// the channel.
3531
3514
/// * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs
3532
3515
/// with the current `ChannelConfig`.
3516
+ /// * Removing peers which have disconnected but and no longer have any channels.
3533
3517
///
3534
3518
/// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate
3535
3519
/// estimate fetches.
@@ -3542,6 +3526,7 @@ where
3542
3526
3543
3527
let mut handle_errors: Vec < ( Result < ( ) , _ > , _ ) > = Vec :: new ( ) ;
3544
3528
let mut timed_out_mpp_htlcs = Vec :: new ( ) ;
3529
+ let mut pending_peers_awaiting_removal = HashSet :: new ( ) ;
3545
3530
{
3546
3531
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
3547
3532
for ( counterparty_node_id, peer_state_mutex) in per_peer_state. iter ( ) {
@@ -3589,10 +3574,13 @@ where
3589
3574
3590
3575
true
3591
3576
} ) ;
3592
- self . add_pending_peer_to_be_removed ( counterparty_node_id, peer_state) ;
3577
+ let peer_should_be_removed = !peer_state. is_connected && peer_state. channel_by_id . len ( ) == 0 ;
3578
+ if peer_should_be_removed {
3579
+ pending_peers_awaiting_removal. insert ( counterparty_node_id) ;
3580
+ }
3593
3581
}
3594
3582
}
3595
- self . remove_peers_awaiting_removal ( ) ;
3583
+ self . remove_peers_awaiting_removal ( pending_peers_awaiting_removal ) ;
3596
3584
3597
3585
self . claimable_payments . lock ( ) . unwrap ( ) . claimable_htlcs . retain ( |payment_hash, ( _, htlcs) | {
3598
3586
if htlcs. is_empty ( ) {
@@ -4347,7 +4335,7 @@ where
4347
4335
}
4348
4336
} ;
4349
4337
peer_state. pending_msg_events . push ( send_msg_err_event) ;
4350
- let _ = remove_channel ! ( self , channel, peer_state ) ;
4338
+ let _ = remove_channel ! ( self , channel) ;
4351
4339
return Err ( APIError :: APIMisuseError { err : "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations." . to_owned ( ) } ) ;
4352
4340
}
4353
4341
@@ -4633,7 +4621,7 @@ where
4633
4621
let ( result, is_permanent) =
4634
4622
handle_monitor_update_res ! ( self , update_res, chan_entry. get_mut( ) , RAACommitmentOrder :: CommitmentFirst , chan_entry. key( ) , NO_UPDATE ) ;
4635
4623
if is_permanent {
4636
- remove_channel ! ( self , chan_entry, peer_state ) ;
4624
+ remove_channel ! ( self , chan_entry) ;
4637
4625
break result;
4638
4626
}
4639
4627
}
@@ -4682,7 +4670,7 @@ where
4682
4670
// also implies there are no pending HTLCs left on the channel, so we can
4683
4671
// fully delete it from tracking (the channel monitor is still around to
4684
4672
// watch for old state broadcasts)!
4685
- ( tx, Some ( remove_channel ! ( self , chan_entry, peer_state ) ) )
4673
+ ( tx, Some ( remove_channel ! ( self , chan_entry) ) )
4686
4674
} else { ( tx, None ) }
4687
4675
} ,
4688
4676
hash_map:: Entry :: Vacant ( _) => return Err ( MsgHandleErrInternal :: send_err_msg_no_close ( format ! ( "Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}" , counterparty_node_id) , msg. channel_id ) )
@@ -5185,11 +5173,12 @@ where
5185
5173
if let Some ( peer_state_mutex) = per_peer_state. get ( & counterparty_node_id) {
5186
5174
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5187
5175
let peer_state = & mut * peer_state_lock;
5176
+ let pending_msg_events = & mut peer_state. pending_msg_events ;
5188
5177
if let hash_map:: Entry :: Occupied ( chan_entry) = peer_state. channel_by_id . entry ( funding_outpoint. to_channel_id ( ) ) {
5189
- let mut chan = remove_channel ! ( self , chan_entry, peer_state ) ;
5178
+ let mut chan = remove_channel ! ( self , chan_entry) ;
5190
5179
failed_channels. push ( chan. force_shutdown ( false ) ) ;
5191
5180
if let Ok ( update) = self . get_channel_update_for_broadcast ( & chan) {
5192
- peer_state . pending_msg_events . push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
5181
+ pending_msg_events. push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
5193
5182
msg : update
5194
5183
} ) ;
5195
5184
}
@@ -5199,7 +5188,7 @@ where
5199
5188
ClosureReason :: CommitmentTxConfirmed
5200
5189
} ;
5201
5190
self . issue_channel_close_events ( & chan, reason) ;
5202
- peer_state . pending_msg_events . push ( events:: MessageSendEvent :: HandleError {
5191
+ pending_msg_events. push ( events:: MessageSendEvent :: HandleError {
5203
5192
node_id : chan. get_counterparty_node_id ( ) ,
5204
5193
action : msgs:: ErrorAction :: SendErrorMessage {
5205
5194
msg : msgs:: ErrorMessage { channel_id : chan. channel_id ( ) , data : "Channel force-closed" . to_owned ( ) }
@@ -5241,7 +5230,7 @@ where
5241
5230
{
5242
5231
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5243
5232
5244
- for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5233
+ for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5245
5234
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5246
5235
let peer_state = & mut * peer_state_lock;
5247
5236
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5281,7 +5270,6 @@ where
5281
5270
}
5282
5271
}
5283
5272
} ) ;
5284
- self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5285
5273
}
5286
5274
}
5287
5275
@@ -5306,7 +5294,7 @@ where
5306
5294
{
5307
5295
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5308
5296
5309
- for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5297
+ for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5310
5298
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5311
5299
let peer_state = & mut * peer_state_lock;
5312
5300
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5344,7 +5332,6 @@ where
5344
5332
}
5345
5333
}
5346
5334
} ) ;
5347
- self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5348
5335
}
5349
5336
}
5350
5337
@@ -5926,7 +5913,7 @@ where
5926
5913
let mut timed_out_htlcs = Vec :: new ( ) ;
5927
5914
{
5928
5915
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5929
- for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5916
+ for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5930
5917
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5931
5918
let peer_state = & mut * peer_state_lock;
5932
5919
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -6010,7 +5997,6 @@ where
6010
5997
}
6011
5998
true
6012
5999
} ) ;
6013
- self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
6014
6000
}
6015
6001
}
6016
6002
@@ -6338,7 +6324,7 @@ where
6338
6324
6339
6325
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
6340
6326
6341
- for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
6327
+ for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
6342
6328
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
6343
6329
let peer_state = & mut * peer_state_lock;
6344
6330
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -6370,7 +6356,6 @@ where
6370
6356
}
6371
6357
retain
6372
6358
} ) ;
6373
- self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
6374
6359
}
6375
6360
//TODO: Also re-broadcast announcement_signatures
6376
6361
Ok ( ( ) )
@@ -6884,8 +6869,6 @@ where
6884
6869
6885
6870
write_ver_prefix ! ( writer, SERIALIZATION_VERSION , MIN_SERIALIZATION_VERSION ) ;
6886
6871
6887
- self . remove_peers_awaiting_removal ( ) ;
6888
-
6889
6872
self . genesis_hash . write ( writer) ?;
6890
6873
{
6891
6874
let best_block = self . best_block . read ( ) . unwrap ( ) ;
@@ -7711,7 +7694,6 @@ where
7711
7694
per_peer_state : FairRwLock :: new ( per_peer_state) ,
7712
7695
7713
7696
pending_events : Mutex :: new ( pending_events_read) ,
7714
- pending_peers_awaiting_removal : Mutex :: new ( HashSet :: new ( ) ) ,
7715
7697
pending_background_events : Mutex :: new ( pending_background_events_read) ,
7716
7698
total_consistency_lock : RwLock :: new ( ( ) ) ,
7717
7699
persistence_notifier : Notifier :: new ( ) ,
@@ -8196,9 +8178,6 @@ mod tests {
8196
8178
// Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
8197
8179
// disconnected and the channel between has been force closed.
8198
8180
let nodes_0_per_peer_state = nodes[ 0 ] . node . per_peer_state . read ( ) . unwrap ( ) ;
8199
- let nodes_0_pending_peers_awaiting_removal = nodes[ 0 ] . node . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) ;
8200
- assert_eq ! ( nodes_0_pending_peers_awaiting_removal. len( ) , 1 ) ;
8201
- assert ! ( nodes_0_pending_peers_awaiting_removal. get( & nodes[ 1 ] . node. get_our_node_id( ) ) . is_some( ) ) ;
8202
8181
// Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
8203
8182
assert_eq ! ( nodes_0_per_peer_state. len( ) , 1 ) ;
8204
8183
assert ! ( nodes_0_per_peer_state. get( & nodes[ 1 ] . node. get_our_node_id( ) ) . is_some( ) ) ;
@@ -8209,7 +8188,6 @@ mod tests {
8209
8188
{
8210
8189
// Assert that nodes[1] has now been removed.
8211
8190
assert_eq ! ( nodes[ 0 ] . node. per_peer_state. read( ) . unwrap( ) . len( ) , 0 ) ;
8212
- assert_eq ! ( nodes[ 0 ] . node. pending_peers_awaiting_removal. lock( ) . unwrap( ) . len( ) , 0 ) ;
8213
8191
}
8214
8192
}
8215
8193
0 commit comments