@@ -1279,6 +1279,10 @@ where
1279
1279
1280
1280
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
1281
1281
1282
+ /// Tracks the channel_update message that were not broadcasted because
1283
+ /// we were not connected to any peers.
1284
+ pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
1285
+
1282
1286
entropy_source: ES,
1283
1287
node_signer: NS,
1284
1288
signer_provider: SP,
@@ -2360,6 +2364,7 @@ where
2360
2364
funding_batch_states: Mutex::new(BTreeMap::new()),
2361
2365
2362
2366
pending_offers_messages: Mutex::new(Vec::new()),
2367
+ pending_broadcast_messages: Mutex::new(Vec::new()),
2363
2368
2364
2369
entropy_source,
2365
2370
node_signer,
@@ -2862,15 +2867,39 @@ where
2862
2867
};
2863
2868
if let Some(update) = update_opt {
2864
2869
// Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
2865
- // not try to broadcast it via whatever peer we have.
2870
+ // not try to broadcast it via whatever peer we are connected to.
2871
+ let brodcast_message_evt = events::MessageSendEvent::BroadcastChannelUpdate {
2872
+ msg: update
2873
+ };
2874
+
2866
2875
let per_peer_state = self.per_peer_state.read().unwrap();
2867
- let a_peer_state_opt = per_peer_state.get(peer_node_id)
2868
- .ok_or(per_peer_state.values().next());
2869
- if let Ok(a_peer_state_mutex) = a_peer_state_opt {
2870
- let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
2871
- a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
2872
- msg: update
2873
- });
2876
+
2877
+ // Attempt to get the a_peer_state_mutex for the peer we force-closed on.
2878
+ let a_peer_state_mutex_opt = per_peer_state
2879
+ .get(peer_node_id)
2880
+ .map(|v| v);
2881
+
2882
+ // If the particular peer is not present, select any random connected peer from the ones we are connected to.
2883
+ let a_peer_state_mutex_opt = a_peer_state_mutex_opt.or_else(|| {
2884
+ per_peer_state
2885
+ .iter()
2886
+ .find(|(_, v)| v.lock().unwrap().is_connected)
2887
+ .map(|(_, v)| v)
2888
+ });
2889
+
2890
+ match a_peer_state_mutex_opt {
2891
+ Some(a_peer_state_mutex) => {
2892
+ // Handle the case where a connected peer is found.
2893
+ let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
2894
+ a_peer_state.pending_msg_events.push(brodcast_message_evt);
2895
+ }
2896
+ None => {
2897
+ // Handle the case where no connected peer is found.
2898
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
2899
+ pending_broadcast_messages.push(brodcast_message_evt);
2900
+ log_info!(self.logger, "Not able to broadcast channel_update of force-closed channel right now.
2901
+ Will try rebroadcasting later.");
2902
+ }
2874
2903
}
2875
2904
}
2876
2905
@@ -4731,6 +4760,7 @@ where
4731
4760
4732
4761
{
4733
4762
let per_peer_state = self.per_peer_state.read().unwrap();
4763
+
4734
4764
for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
4735
4765
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
4736
4766
let peer_state = &mut *peer_state_lock;
@@ -9006,6 +9036,23 @@ where
9006
9036
msg: chan.get_channel_reestablish(&self.logger),
9007
9037
});
9008
9038
});
9039
+
9040
+ {
9041
+ // Get pending messages to be broadcasted.
9042
+ let broadcast_evts = self.pending_broadcast_messages.lock().unwrap();
9043
+
9044
+ // If we have some pending message to broadcast, and we are connected to peers.
9045
+ if broadcast_evts.len() > 0 && per_peer_state.len() > 0 {
9046
+ let a_peer_state_mutex = per_peer_state.values().next().unwrap();
9047
+ let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
9048
+
9049
+ a_peer_state.pending_msg_events.extend(broadcast_evts.iter().cloned());
9050
+
9051
+ self.pending_broadcast_messages.lock().unwrap().clear();
9052
+
9053
+ return NotifyOption::DoPersist;
9054
+ }
9055
+ }
9009
9056
}
9010
9057
9011
9058
return NotifyOption::SkipPersistHandleEvents;
@@ -10999,6 +11046,8 @@ where
10999
11046
11000
11047
pending_offers_messages: Mutex::new(Vec::new()),
11001
11048
11049
+ pending_broadcast_messages: Mutex::new(Vec::new()),
11050
+
11002
11051
entropy_source: args.entropy_source,
11003
11052
node_signer: args.node_signer,
11004
11053
signer_provider: args.signer_provider,
0 commit comments